diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..52f327cdb1f62f14570335d28f35fc9a336de4b0 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +artifact-overview.pdf diff --git a/artifact-overview.typ b/artifact-overview.typ index 5a027b6e9549322c34c1f897cc8a8e7c607e31c8..f84abd20b76b4d92416a93f12f09bae4f8511828 100644 --- a/artifact-overview.typ +++ b/artifact-overview.typ @@ -281,12 +281,11 @@ The step-by-step instructions of this document can be used in several ways *depe + You can *check* the final analyses (code + plots) done in Article @lightpredenergy by reading the provided pre-rendered notebooks available on #link(zenodo-url)[Zenodo]. + You can *reproduce* the *final analyses* by first downloading the provided aggregated results of the experiments from #link(zenodo-url)[Zenodo], and then by running the notebooks yourself. This enables you to *edit* our notebooks before running them, so that you can to modify the analyses done or add your own. - // - Refer to #todo[link to Danilo's notebook section] for the machine learning experiment. + - Refer to @sec-analyze-prediction-results for instructions to analyze the results of the machine learning experiment. - Refer to @sec-analyze-simu-campaign-results for instructions to analyze the results of the scheduling experiment. + You can *reproduce* our *experimental campaigns* by downloading the provided experiment input files from #link(zenodo-url)[Zenodo], and then by running the experiment yourself. This can enable you to make sure that our experiment can be reproduced with the *exact same parameters and configuration*. - //- Refer to #todo[link to Danilo's expe section?] for the machine learning experiment. - Refer to @sec-run-simu-campaign for instructions to reproduce the scheduling experiment. + You can *fully reproduce* our *experimental campaigns* by downloading original traces of the Marconi100, by generating the experimental campaigns parameters yourself (enabling you to hack provided command-line parameters or provided code), @@ -294,19 +293,6 @@ The step-by-step instructions of this document can be used in several ways *depe You can follow all steps below in this case, but *please do note that this is disk/bandwidth/computation-intensive.* -== Job power prediction <sec-job-power-pred> -How to reproduce the power predictions of jobs has not been written yet. -The expected output data of this section has however been stored on #link(zenodo-url)[Zenodo]. - -//#tododanilo[how to reproduce this experiment?] - -#fullbox(footer: [Disk: 82 Mo.])[ - #filehashes(( - "fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz", - "954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz", - )) -] - == Analysis and modeling of the power behavior of Marconi100 nodes === Get power and job Marconi100 traces on your disk <sec-m100-power-job-traces> This section downloads parts of the Marconi100 trace as archives from #link("https://gitlab.com/ecs-lab/exadata")[the ExaData Zenodo files], checks that the archives have the right content (via MD5 checksums), extracts the data needed by later stages of the pipeline (node power usage traces, jobs information traces), then finally removes unneeded extracted files and the downloaded archives. @@ -363,7 +349,7 @@ The notebook also generates a power model of the Marconi100 nodes, which is requ Required input files. - `m100-data/22-agg_power_total.csv` (output of @sec-agg-power-traces-per-node). -#fullbox(footer:[Disk: 1.7 Mo. Time (laptop): 00:00:10])[ +#fullbox(footer:[Disk: 1.7 Mo. Time (laptop): 00:00:10.])[ ```sh nix develop .#r-notebook --command \ Rscript notebooks/run-rmarkdown-notebook.R \ @@ -381,6 +367,147 @@ Required input files. Their content should be completely reproducible though. ] +== Job power prediction <sec-job-power-pred> + +The experimental workflow consists of three parts, (i) preprocessing of the original data, and +(ii) prediction of the mean and maximum power consumption. +Please note that reproducing this section involves *heavy computations* and *big data*. +We have *not* made intermediate files available on #link(zenodo-url)[Zenodo] as they were too big. + +=== Pre-processing + +==== Step 1 + +#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 18:00:00.]])[ +```sh +for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do + nix develop .#py-scripts --command m100-pred-preprocess1 \ + -j ./m100-data/${month}_jobs.parquet \ + -m ./m100-data/${month}_power_total.parquet +done +``` +] + +==== Step 2 + +#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 66:00:00.]])[ +```sh +for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do + nix develop .#py-scripts --command m100-pred-preprocess2 \ + -js ./m100-data/${month}_filter12_singlenode.csv \ + -jm ./m100-data/${month}_filter12_multinode.csv \ + -m ./m100-data/${month}_power_total.parquet +done +``` +] + +=== Aggregate step 2 output into a single file + +#fullbox(footer: [#emph-overhead[Disk: 32 Go.]])[ +```sh +find . -name '*filter123*' | \ + tar -zcvf exadata_job_energy_profiles.tar.gz --files-from - +``` +] + +=== Compute power metrics and add job information + +#fullbox(footer: [#emph-overhead[Disk: 32 Go.]])[ +```sh +for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do + nix develop .#py-scripts --command m100-pred-jobs-extract-power-metrics \ + -d ./m100-data/${month} +done +``` +] + +=== Merge files into a single CSV file + +This will output the `filter123_all_jobs_aggmetrics.csv.gz` needed for the prediction script. + +#fullbox(footer: [Disk: 82 Mo.])[ + +```sh +nix develop .#py-scripts --command m100-pred-merge-jobfiles -d ./m100-data/ +``` +] + +== Predicting Job mean and maximum power consumption + +#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 72:00:00.]])[ + ```sh + mkdir ./m100-data/total_power_mean_predictions_users_allmethods_mean + mkdir ./m100-data/total_power_mean_predictions_users_allmethods_max + + nix develop .#py-scripts --command \ + run-prediction-per-user-allmethods-mean \ + -j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \ + -o ./m100-data/total_power_mean_predictions_users_allmethods_mean + + nix develop .#py-scripts --command \ + run-prediction-per-user-allmethods-max \ + -j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \ + -o ./m100-data/total_power_mean_predictions_users_allmethods_max + ``` +] + +=== Compressing prediction output into single files +The expected output data has been stored on #link(zenodo-url)[Zenodo]. + +#fullbox(footer:[Disk: 82 Mo.])[ + ```sh + tar -cvzf ./m100-data/power_pred_users_allmethods_max.tar.gz \ + ./m100-data/total_power_mean_predictions_users_allmethods_mean + tar -cvzf ./m100-data/power_pred_users_allmethods_mean.tar.gz \ + ./m100-data/total_power_mean_predictions_users_allmethods_max + ``` + + #filehashes(( + "fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz", + "954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz", + )) +] + +== Analyzing prediction results <sec-analyze-prediction-results> +This analysis requires that the two job power prediction archives (outputs of @sec-job-power-pred, available on #link(zenodo-url)[Zenodo]) are available on your disk in the `./user-power-predictions` directory. +The following command populates the `./user-power-predictions/data` by extracting the archives and uncompressing all the required files on your disk. + +#fullbox(footer: [Disk: 519 Mo. Time: 00:00:05.])[ + ```sh + mkdir ./user-power-predictions/data + nix develop .#merge-m100-power-predictions --command \ + tar xf ./user-power-predictions/*mean.tar.gz --directory ./user-power-predictions/data + nix develop .#merge-m100-power-predictions --command \ + tar xf ./user-power-predictions/*max.tar.gz --directory ./user-power-predictions/data + nix develop .#merge-m100-power-predictions --command \ + gunzip ./user-power-predictions/data/*/*.gz + ``` +] + +The analysis of the predictions, which also generates Figures 2 and 3 of Article @lightpredenergy, can be reproduced with the following command. + +#fullbox(footer:[Time (laptop): 00:00:20.])[ + ```sh + nix develop .#r-py-notebook --command \ + Rscript notebooks/run-rmarkdown-notebook.R \ + notebooks/prediction-results-analysis.Rmd + ``` + + #filehashes(( + "6ce534dd1bf017444f05c354a1b3b767", "notebooks/fig2a-distrib-job-power-mean.svg", + "0b1cdfcf017c2cba7057a544e19cd698", "notebooks/fig2b-distrib-job-power-max.svg", + "0bc88e65ae495a8d6ec7d3cbcfca12ae", "notebooks/fig3a-pred-mape-mean-power.svg", + "a19b1a7c5dc72ec73a5349d85fc68fa3", "notebooks/fig3b-pred-mape-max-power.svg", + "04c2d5ef412b791a4d5515ec0719b3d0", "notebooks/prediction-results-analysis.html", + ), fill: (x, y) => { + if y > 0 { red.lighten(80%) } + }, + ) + + We could not make HTML notebooks and Python-generated images binary reproducible despite our best efforts. + Their content should be completely reproducible though. +] + == Job scheduling with power prediction <sec-sched> This section shows how to reproduce Sections 6.4 and 6.5 of article @lightpredenergy. @@ -499,7 +626,7 @@ Required input files. - The `/tmp/wlds` directory (#emph-overhead[1.4 Go]) that contains all the workload files (output of @sec-gen-workloads). Please note that all input files can be downloaded from #link(zenodo-url)[Zenodo] if you have not generated them yourself. -In particular to populate the `/tmp/wlds`directory you can *download file* `workloads.tar.xz` and then *extract it* into `/tmp/` via a command such as the following. `tar xf workloads.tar.xz --directory=/tmp/` +In particular to populate the `/tmp/wlds` directory you can *download file* `workloads.tar.xz` and then *extract it* into `/tmp/` via a command such as the following. `tar xf workloads.tar.xz --directory=/tmp/` #fullbox(footer: [#emph-overhead[Disk: 7.6 Go.] Time: 00:06:00.])[ ```sh diff --git a/flake.nix b/flake.nix index ce41a85461231cb756a0b98362372a320bf4e755..80340f5ec05447d4618fcd329c34085ffac34b2a 100644 --- a/flake.nix +++ b/flake.nix @@ -70,6 +70,8 @@ propagatedBuildInputs = with pyPkgs; [ packages.fastparquet-2402 pyPkgs.sortedcontainers + pyPkgs.scipy + pyPkgs.scikit-learn ]; }; fastparquet-2402 = pyPkgs.fastparquet.overrideAttrs(final: prev: rec { @@ -83,7 +85,7 @@ }); easypower-sched-lib = easy-powercap-pkgs.easypower; }; - devShells = { + devShells = rec { download-m100-months = pkgs.mkShell { buildInputs = [ packages.python-scripts @@ -125,6 +127,14 @@ pkgs.rPackages.viridis ]; }; + r-py-notebook = pkgs.mkShell { + buildInputs = r-notebook.buildInputs ++ [ + pkgs.rPackages.reticulate + pyPkgs.pandas + pyPkgs.seaborn + pyPkgs.scikit-learn + ]; + }; typst-shell = pkgs.mkShell { buildInputs = [ typstpkgs.typst-dev diff --git a/notebooks/prediction-results-analysis.Rmd b/notebooks/prediction-results-analysis.Rmd new file mode 100644 index 0000000000000000000000000000000000000000..e20a77bb435d13092d2c97891a8fb78939b22d24 --- /dev/null +++ b/notebooks/prediction-results-analysis.Rmd @@ -0,0 +1,265 @@ +--- +title: "Job power prediction result analysis" +author: "Danilo Carastan-Santos" +date: "2024-05-15" +output: + rmdformats::readthedown +--- + +## Processing the mean power prediction results +Outputs of script `run_prediction_per_user_allmethods_mean.py`. + +```{python} +import pandas as pd +import seaborn as sns + +import os + +RESULTS_PATH = "../user-power-predictions/data/total_power_mean_predictions_users_allmethods_mean/" +PRED_COLS = ["hist_pred_total_power_mean", + "LinearRegression_total_power_mean_watts", + "RandomForestRegressor_total_power_mean_watts", + "LinearSVR_total_power_mean_watts", + "SGDRegressor_total_power_mean_watts"] + + +result_filenames = os.listdir(RESULTS_PATH) + +df_all_results = pd.concat([pd.read_csv(RESULTS_PATH+filename, low_memory=False) for filename in result_filenames]) + +df_all_results = df_all_results.dropna(subset=PRED_COLS) +df_all_results + + +from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error + +lst_users = df_all_results["user_id"].drop_duplicates().to_list() +#print(lst_users) + +df_results_user_group = df_all_results.groupby("user_id") + +lst_stats_per_user = [] + +for user in lst_users: + results_user = df_results_user_group.get_group(user) + hist_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["hist_pred_total_power_mean"]) + LR_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["LinearRegression_total_power_mean_watts"]) + RF_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["RandomForestRegressor_total_power_mean_watts"]) + LSVR_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["LinearSVR_total_power_mean_watts"]) + SGD_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["SGDRegressor_total_power_mean_watts"]) + res = {"user_id": user, + "hist_mape": hist_mape, + "LinearRegression_mape": LR_mape, + "RandomForestRegressor_mape": RF_mape, + "LinearSVR_mape": LSVR_mape, + "SGDRegressor_mape": SGD_mape} + lst_stats_per_user.append(res) + #break + +df_stats_per_user = pd.DataFrame(lst_stats_per_user) +df_stats_per_user + +COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"] +df_stats_per_user[COLS].describe() + +COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"] +df_stats_per_user_pivot = pd.melt(df_stats_per_user, id_vars="user_id") +df_stats_per_user_pivot +``` + +### Figure 3 (a) +```{python} +import matplotlib.pyplot as plt + +TINY_SIZE = 2 +SMALL_SIZE = 5 +MEDIUM_SIZE = 20 +BIGGER_SIZE = 50 +FIG_WIDTH = 40 +FIG_HEIGHT = 10 + +#plt.rc('font', size=16) # controls default text sizes +plt.rc('font', size=20) # controls default text sizes +plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title +plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels +plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize +plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title +plt.rc('figure', figsize=(8,4)) + +#g = sns.boxplot(x="variable", y="value", data=df_stats_per_user_pivot, showfliers=False) +#plt.xticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=30) +g = sns.boxplot(y="variable", x="value", data=df_stats_per_user_pivot, showfliers=False) +plt.yticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=0) + +g.set_ylabel("Prediction Method") +g.set_xlabel("Mean Absolute Percentage Error (MAPE) ") +plt.tight_layout(pad=0) +plt.savefig("./fig3a-pred-mape-mean-power.svg") +``` + +## Processing the max power prediction results +Outputs of script `run_prediction_per_user_allmethods_max.py`. + +```{python} +import pandas as pd +import seaborn as sns + +import os + +RESULTS_PATH = "../user-power-predictions/data/total_power_mean_predictions_users_allmethods_max/" + +PRED_COLS = ["hist_pred_total_power_max", + "LinearRegression_total_power_max_watts", + "RandomForestRegressor_total_power_max_watts", + "LinearSVR_total_power_max_watts", + "SGDRegressor_total_power_max_watts"] + + +result_filenames = os.listdir(RESULTS_PATH) + +df_all_results = pd.concat([pd.read_csv(RESULTS_PATH+filename, low_memory=False) for filename in result_filenames]) + +df_all_results = df_all_results.dropna(subset=PRED_COLS) +#df_all_results + + +from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error + +lst_users = df_all_results["user_id"].drop_duplicates().to_list() +#print(lst_users) + +df_results_user_group = df_all_results.groupby("user_id") + +lst_stats_per_user = [] + +for user in lst_users: + results_user = df_results_user_group.get_group(user) + hist_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["hist_pred_total_power_max"]) + LR_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["LinearRegression_total_power_max_watts"]) + RF_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["RandomForestRegressor_total_power_max_watts"]) + LSVR_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["LinearSVR_total_power_max_watts"]) + SGD_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["SGDRegressor_total_power_max_watts"]) + res = {"user_id": user, + "hist_mape": hist_mape, + "LinearRegression_mape": LR_mape, + "RandomForestRegressor_mape": RF_mape, + "LinearSVR_mape": LSVR_mape, + "SGDRegressor_mape": SGD_mape} + lst_stats_per_user.append(res) + #break + +df_stats_per_user = pd.DataFrame(lst_stats_per_user) +#df_stats_per_user + +COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"] +df_stats_per_user[COLS].describe() + +COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"] +df_stats_per_user_pivot = pd.melt(df_stats_per_user, id_vars="user_id") +df_stats_per_user_pivot +``` + +### Figure 3 (b) +```{python} +import matplotlib.pyplot as plt + +TINY_SIZE = 2 +SMALL_SIZE = 5 +MEDIUM_SIZE = 20 +BIGGER_SIZE = 50 +FIG_WIDTH = 40 +FIG_HEIGHT = 10 + + +plt.rc('font', size=20) # controls default text sizes +plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title +plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels +plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize +plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title +plt.rc('figure', figsize=(8,4)) + +#g = sns.boxplot(x="variable", y="value", data=df_stats_per_user_pivot, showfliers=False) +#plt.xticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=30) +#g.set_xlabel("Prediction Method") +#g.set_ylabel("Mean Absolute Percentage Error (MAPE) ") + +g = sns.boxplot(y="variable", x="value", data=df_stats_per_user_pivot, showfliers=False) +plt.yticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=0) +g.set_ylabel("Prediction Method") +g.set_xlabel("Mean Absolute Percentage Error (MAPE)") +plt.tight_layout(pad=0) +plt.savefig("./fig3b-pred-mape-max-power.svg") +``` + +## Getting the actual mean and max power distributions +### Mean: Figure 2 (a) +```{python} +import matplotlib.pyplot as plt +import seaborn as sns + +TINY_SIZE = 2 +SMALL_SIZE = 5 +MEDIUM_SIZE = 20 +BIGGER_SIZE = 50 +FIG_WIDTH = 40 +FIG_HEIGHT = 10 + +plt.clf() + +plt.rc('figure', figsize=(8, 6)) +plt.rc('font', size=MEDIUM_SIZE) # controls default text sizes +plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title +plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels +plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize +plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title +plt.rc('figure', figsize=(6,4)) + +g = sns.histplot(x="total_power_mean_watts", data=df_all_results, bins=25, fill=False) +#g.ax.set_yscale('log') +g.set_xlabel("Total Power (watts)") +g.set_ylabel("Number of Jobs") +plt.xticks(ticks=[0,250,500,750,1000,1250,1500], rotation=30) +plt.tight_layout(pad=0) +plt.savefig("./fig2a-distrib-job-power-mean.svg") +``` + +### Max : Figure 2 (b) +```{python} +import matplotlib.pyplot as plt +import seaborn as sns + +TINY_SIZE = 2 +SMALL_SIZE = 5 +MEDIUM_SIZE = 20 +BIGGER_SIZE = 50 +FIG_WIDTH = 40 +FIG_HEIGHT = 10 + +plt.clf() + +plt.rc('figure', figsize=(8, 6)) +plt.rc('font', size=MEDIUM_SIZE) # controls default text sizes +plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title +plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels +plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels +plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize +plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title +plt.rc('figure', figsize=(6,4)) + +#g = sns.displot(x="total_power_max_watts", data=df_all_results) +g = sns.histplot(x="total_power_max_watts", data=df_all_results, bins=25, fill=False) + +#g.ax.set_yscale('log') +g.set_xlabel("Total Power (watts)") +g.set_ylabel("Number of Jobs") +plt.xticks(ticks=[0,250,500,750,1000,1250,1500,1750,2000], rotation=30) +plt.tight_layout(pad=0) +plt.savefig("./fig2b-distrib-job-power-max.svg") +``` diff --git a/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py b/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..9e3025ac58cc5cde4cd803b157bea4d4b68e2d2a --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py @@ -0,0 +1,77 @@ +import argparse +import sys +import pandas as pd +import numpy as np +from scipy.stats import iqr + +""" +Read input data +""" +def read_data(rootdir): + df_jobs_single = pd.read_csv(rootdir+"_filter123_singlenode.csv") + df_jobs_multi = pd.read_csv(rootdir+"_filter123_multinode.csv") + df_power_single = pd.read_csv(rootdir+"_filter123_singlenode.csv") + df_power_multi = pd.read_csv(rootdir+"_filter123_multinode.csv") + df_jobs = pd.concat([df_jobs_single, df_jobs_multi]).reset_index(drop=True) + df_power = pd.concat([df_power_single, df_power_multi]).reset_index(drop=True) + df_power['node'] = pd.to_numeric(df_power['node']) + df_power['value'] = pd.to_numeric(df_power['value']) + return df_jobs, df_power + + +""" +Calculate jobs' power aggregation metrics +""" +def calculate_agg_metrics(df_jobs, df_power): + powertrace_jobs = [df_power[df_power["job_id"]==x]['value'].values for x in df_jobs['job_id'].values] + df_jobs["total_power_max_watts"] = [x.max() for x in powertrace_jobs] + df_jobs["total_power_mean_watts"] = [x.mean() for x in powertrace_jobs] + df_jobs["total_power_median_watts"] = [np.median(x) for x in powertrace_jobs] + df_jobs["total_power_std_watts"] = [x.std() for x in powertrace_jobs] + df_jobs["total_power_iqr_watts"] = [iqr(x, rng=(25, 75)) for x in powertrace_jobs] + df_jobs["total_power_1090range_watts"] = [iqr(x, rng=(10, 90)) for x in powertrace_jobs] + return df_jobs + + +""" +Save results +""" +def save_results(df_jobs_aggmetrics, rootdir): + df_jobs_aggmetrics.to_csv(rootdir+"_filter123_aggmetrics.csv") + +""" +Run workflow +""" +def run_workflow(rootdir): + df_jobs, df_power = read_data(rootdir) + df_jobs_aggmetrics = calculate_agg_metrics(df_jobs, df_power) + print(df_jobs_aggmetrics[["run_time", + "num_nodes", + "total_power_max_watts", + "total_power_mean_watts", + "total_power_median_watts"]]) + save_results(df_jobs_aggmetrics, rootdir) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job power metrics. This script uses the output from process_marconi_jobs_2.py') + + p.add_argument("--rootdir", "-d", type=str, required=True, + help="Root directory of the trace") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + sys.exit(2) + run_workflow(args.rootdir) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py b/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py new file mode 100644 index 0000000000000000000000000000000000000000..bad85e3f2ee1889974cc2f7a96da6c2936ddc3e8 --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py @@ -0,0 +1,55 @@ +import glob +import pandas as pd +import sys +import argparse + +""" +Read job files spread in the months folders +""" +def read_jobifles(rootdir): + #DATASET_PATH = "/home/dancarastan/Documentos/exadata_job_energy_profiles/" + + jobfiles_list = glob.glob(rootdir+"*"+"_filter123_aggmetrics.csv") + + #print(len(jobfiles_list)) + df_jobs = pd.concat([pd.read_csv(jobfile) for jobfile in jobfiles_list]).reset_index(drop=True) + return df_jobs + + +""" +Save results to compressed csv +""" +def save_results(df_jobs, rootdir): + df_jobs.to_csv(rootdir+"/filter123_all_jobs_aggmetrics.csv.gz", index=False) + + +""" +Run workflow +""" +def run_workflow(rootdir): + df_jobs = read_jobifles(rootdir) + save_results(df_jobs, rootdir) + + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Concatenate job table files into a single csv file') + + p.add_argument("--rootdir", "-d", type=str, required=True, + help="Root directory of the trace") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.rootdir) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/m100_pred_preprocess_1.py b/scripts-py/expe_energumen/m100_pred_preprocess_1.py new file mode 100644 index 0000000000000000000000000000000000000000..7eb48462642c32a8cd2b95f0fd08bd3eb62134ae --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_preprocess_1.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +import sys +import argparse +import ast +import pandas as pd +import numpy as np + +""" +Read Data +""" +def read_data(jobfile, metricfile): + df_jobs = pd.read_parquet(jobfile) + df_power = pd.read_parquet(metricfile) + ## I call here df_power, but in reality it can be any metric + return df_jobs, df_power + + +""" +Convert some string values to numeric +Convert timestamps to seconds for job runtime +""" +def preprocess_data(df_jobs, df_power): + df_power['node'] = pd.to_numeric(df_power['node']) + df_power['value'] = pd.to_numeric(df_power['value']) + df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's') + print("Total number of jobs: ", len(df_jobs)) + return df_jobs, df_power + + +""" +Filter 1: remove jobs that take less than a minute +""" +def filter1(df_jobs): + SECONDS_ONE_MINUTE = 60.0 + + df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE] + print("Number of jobs after filter 1: ", str(len(df_jobs_f1))) + return df_jobs_f1 + + +""" +Filter 2: remove jobs with no energy profile +Method for single-node jobs +""" +def filter2_single(df_jobs, df_power): + df_jobs_f1_single = df_jobs[df_jobs["num_nodes"]==1] + + #removing malformed "nodes" + df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True) + df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']] + + #use this for smaller inputs + #sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True) + + sample = df_jobs_f1_single + + group = df_power.groupby(by="node") + + available_nodes = group.groups.keys() + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that 0 is node, 1 is start_time, and 2 is end_time + result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) if x[0] in available_nodes else False for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + sample["has_profile"] = result + df_jobs_f12_single = sample[sample["has_profile"]==True] + + print("Number of jobs after filter 2 - single: ", str(len(df_jobs_f12_single))) + return df_jobs_f12_single + + +""" +Filter 2: remove jobs with no energy profile +Method for multi-node jobs +""" +def filter2_multi(df_jobs, df_power): + df_jobs_f1_multi = df_jobs[df_jobs["num_nodes"] > 1].copy().reset_index(drop=True) + + #removing malformed "nodes" + df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']] + + #use this for smaller inputs + #sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True) + + sample = df_jobs_f1_multi + + group = df_power.groupby(by="node") + + available_nodes = set(group.groups.keys()) + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that 0 is node, 1 is start_time, and 2 is end_time + result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) if set(x[0]).issubset(available_nodes) else False for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + sample["has_profile"] = result + df_jobs_f12_multi = sample[sample["has_profile"]==True] + print("Number of jobs after filter 2 - multi: ", str(len(df_jobs_f12_multi))) + return df_jobs_f12_multi + + +""" +Save intermediate results to csv +""" +def save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile): + jobfile_out = jobfile.rstrip("jobs.parquet") + df_jobs_single.to_csv(jobfile_out+"_filter12_singlenode.csv", index=False) + df_jobs_multi.to_csv(jobfile_out+"_filter12_multinode.csv", index=False) + +""" +Run workflow +""" +def run_workflow(metricfile, jobfile): + df_jobs, df_power = read_data(jobfile=jobfile, metricfile=metricfile) + df_jobs, df_power = preprocess_data(df_jobs=df_jobs, df_power=df_power) + df_jobs = filter1(df_jobs) + df_jobs_single = filter2_single(df_jobs, df_power) + df_jobs_multi = filter2_multi(df_jobs, df_power) + save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles') + + p.add_argument("--metricfile", "-m", type=str, required=True, + help="Metric file") + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.metricfile, args.jobfile) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/m100_pred_preprocess_2.py b/scripts-py/expe_energumen/m100_pred_preprocess_2.py new file mode 100644 index 0000000000000000000000000000000000000000..f10f994872a7b4626261c13839d42b7e6f4e820a --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_preprocess_2.py @@ -0,0 +1,242 @@ +import argparse +import sys +import ast +import multiprocessing +import functools +import numpy as np +import pandas as pd + +""" +Read data +Output from process_marconi_jobs_1.py +""" +def read_data(jobfile_single, jobfile_multi, metricfile): + df_jobs_single = pd.read_csv(jobfile_single) + df_jobs_multi = pd.read_csv(jobfile_multi) + + ## Here i refer to total_power but in reality it can be any metric + df_total_power = pd.read_parquet(metricfile) + df_total_power['node'] = pd.to_numeric(df_total_power['node']) + df_total_power['value'] = pd.to_numeric(df_total_power['value']) + return df_jobs_single, df_jobs_multi, df_total_power + + +""" +Filter 3: find the jobs that share a node +The idea is to add a new column to `df_total_power` +with a list of jobs that run on each node at each timestamp. +Then we filter jobs that appear sharing with another job +""" +def filter3_1_single(df_jobs_single, df_total_power): + #removing malformed "nodes" + df_jobs_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_single['nodes']] + + #use this for smaller inputs + #TEST_SAMPLE_SIZE = 1000 + + TEST_SAMPLE_SIZE = len(df_jobs_single) + sample = df_jobs_single[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True) + + #sample = df_jobs_single + + group = df_total_power.groupby(by="node") + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that x[0] is node, x[1] is start_time, and x[2] is end_time + #each element of the list is a subseries of the total_power of a certain mode + #with "1" at the timestamps where the "job_id" ran + result = [[group.get_group(x[0])["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str), x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()] + + # now i replace the "1"s with the "job_id" + #each element of the list is a subseries of the total_power of a certain mode + #with the "job_id" value at the timestamps where the "job_id" ran + result2 = [x[0].replace("1.0", str(x[1])) for x in result] + #print(result2) + + #finally i concatenate each of the series by index + # while joining the values where indices overlap + # i hope the indices here are consistent with the indices in df_total_power + #concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True) + concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids") + + joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner") + print(joined_total_power) + return joined_total_power + + +""" +Filter 3: find the jobs that not share a node +Based on filter3_single_1, return the list of job ids that +not shared a node with another job +""" +def filter3_2(joined_total_power): + all_job_ids=[] + _ = pd.Series([[all_job_ids.append(y) for y in x.split(",")] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()]) + + all_job_ids = pd.Series(all_job_ids).drop_duplicates() + + nodeshare_job_ids = [] + _ = pd.Series([[nodeshare_job_ids.append(y) for y in x.split(",") if len(x.split(",")) > 1] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()]) + nodeshare_job_ids = pd.Series(nodeshare_job_ids).drop_duplicates() + + exclusive_job_ids = all_job_ids[~all_job_ids.isin(nodeshare_job_ids)] + print("Nodeshare Job ratio: ", len(nodeshare_job_ids)/len(all_job_ids)) + print(exclusive_job_ids) + return exclusive_job_ids + +""" +Filter 3: find the jobs that not share a node +Getting the profile of the not nodeshared jobs +And details of the exclusive jobs +""" +def filter3_3(df_jobs_single, exclusive_job_ids, joined_total_power): + result = [x for x in joined_total_power.values if np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) == True] + df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"]) + df_exclusive_jobs = df_jobs_single[df_jobs_single["job_id"].isin(exclusive_job_ids.astype(int))] + return df_total_power_exclusive_single, df_exclusive_jobs + +""" +Filter 3: Define the function to check the condition in parallel +""" +def check_condition_f3(x, exclusive_job_ids=None): + return np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) + +""" +Filter 3: find the jobs that not share a node +Getting the profile of the not nodeshared jobs +And details of the exclusive jobs +Attempt to do this in parallel +""" +def filter3_3_par(df_jobs, exclusive_job_ids, joined_total_power): + # Use multiprocessing.Pool to parallelize the list comprehension + pool = multiprocessing.Pool() + result = pool.map(functools.partial(check_condition_f3, exclusive_job_ids=exclusive_job_ids), joined_total_power.values) + + # Filter the values based on the condition + result = [x for x, res in zip(joined_total_power.values, result) if res] + + df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"]) + df_exclusive_jobs = df_jobs[df_jobs["job_id"].isin(exclusive_job_ids.astype(int))] + return df_total_power_exclusive_single, df_exclusive_jobs + + +""" +Filter 3: find the jobs that share a node +The idea is to add a new column to `df_total_power` +with a list of jobs that run on each node at each timestamp. +Then we filter jobs that appear sharing with another job +Version for multi-node jobs +""" +def filter3_1_multi(df_jobs_multi, df_total_power): + df_jobs_multi["node"] = [ast.literal_eval(x) for x in df_jobs_multi['nodes']] + + #use this for smaller inputs + #TEST_SAMPLE_SIZE = 1000 + + TEST_SAMPLE_SIZE = len(df_jobs_multi) + sample = df_jobs_multi[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True) + + #sample = df_jobs_single + + group = df_total_power.groupby(by="node") + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that x[0] is the node list, x[1] is start_time, and x[2] is end_time + #each element of the list is a subseries of the total_power of a certain mode + #with "1" at the timestamps where the "job_id" ran + # i replace the "0"s (where the timestamp is not betweenx[1], x[2]) with nans just to remove these values with dropna. + result = [[[group.get_group(y)["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str) for y in x[0]], x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()] + + #print(type(group.get_group(512))) + #group.get_group(x[0]) + + #result[0] = Series with timestamps with True where the job run, and False otherwise + #result[1] = job id + #result.values.to_list() + + # now i replace the "1"s with the "job_id" + #each element of the list is a subseries of the total_power of a certain mode + #with the "job_id" value at the timestamps where the "job_id" ran + result2 = [pd.concat([y.replace("1.0", str(x[1])) for y in x[0]]) for x in result] + + #finally i concatenate each of the series by index + # while joining the values where indices overlap + # i hope the indices here are consistent with the indices in df_total_power + #concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True) + + concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids") + #print(concatenated_series) + + # use the below code for the full run + #joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="left") + + #use the below code for a rest run + joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner") + print(joined_total_power) + return joined_total_power + +""" +Save results to csv +""" +def save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile): + jobfile_out = jobfile_single.rstrip("_filter12_singlenode.csv") + metricfile_out = metricfile.rstrip("power_total.parquet") + df_exclusive_jobs_single.to_csv(jobfile_out+"_filter123_singlenode.csv", index=False) + df_exclusive_jobs_multi.to_csv(jobfile_out+"_filter123_multinode.csv", index=False) + df_total_power_exclusive_single.to_csv(metricfile_out+"a_0_filter123_singlenode.csv", index=False) + df_total_power_exclusive_multi.to_csv(metricfile_out+"a_0_filter123_multinode.csv", index=False) + + +""" +Run workflow +""" +def run_workflow(metricfile, jobfile_single, jobfile_multi): + df_jobs_single, df_jobs_multi, df_total_power = read_data(jobfile_single, jobfile_multi, metricfile) + #Single-node jobs workflow + joined_total_power = filter3_1_single(df_jobs_single, df_total_power) + exclusive_job_ids = filter3_2(joined_total_power) + #df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power) + df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_3_par(df_jobs_single, exclusive_job_ids, joined_total_power) + print(df_total_power_exclusive_single) + print(df_exclusive_jobs_single) + ############################### + #Multi-node jobs workflow + joined_total_power = filter3_1_multi(df_jobs_multi, df_total_power) + exclusive_job_ids = filter3_2(joined_total_power) + #df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power) + df_total_power_exclusive_multi, df_exclusive_jobs_multi = filter3_3_par(df_jobs_multi, exclusive_job_ids, joined_total_power) + print(df_total_power_exclusive_multi) + print(df_exclusive_jobs_multi) + save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile) + ############################### + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles') + + p.add_argument("--metricfile", "-m", type=str, required=True, + help="Metric file") + p.add_argument("--jobfilesingle", "-js", type=str, required=True, + help="Job table file for single-node jobs (output from process_marconi_jobs_1.py)") + p.add_argument("--jobfilemulti", "-jm", type=str, required=True, + help="Job table file for multi-node jobs (output from process_marconi_jobs_1.py)") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.metricfile, args.jobfilesingle, args.jobfilemulti) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py b/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py new file mode 100644 index 0000000000000000000000000000000000000000..1b48ffcad58a329acc340c14e82e4914f43306fe --- /dev/null +++ b/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +import sys +import argparse +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor +from sklearn.svm import SVR, LinearSVR +from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.model_selection import train_test_split, GridSearchCV +from sklearn.feature_selection import RFE +from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error +from itertools import product + +""" +Initial variables +""" +## Time constants +NANOSECONDS_ONE_SECOND = 1e9 +SEC_ONE_MINUTE=60 +SEC_ONE_HOUR=60*SEC_ONE_MINUTE +SEC_ONE_DAY=24*SEC_ONE_HOUR +SEC_ONE_WEEK=7*SEC_ONE_DAY + +# Experiments parameters +## values for job submission time difference penalization +POWER_FACTORS={"sublinear": 0.5, + "linear": 1.0, + "quadratic": 2.0} +## Predict using only jobs with the same nb of resources +SAME_NB_RESOURCES={"yes": True, + "no": False} + + +## List of models used in the regression +MODELS=[ + {'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}}, + {'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}}, + {'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}}, + {'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}}, +] +MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor'] + +## train model only if the user's number of jobs is at least MIN_NB_JOBS +MIN_NB_JOBS=[10] +## Further reduce train set size (used for testing, default must be 0.0) +## For info: regression train/test split is 0.66 train and 0.33 test +#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0] +PERCENTAGE_TRAINING_DATA=[0.0] +## Columns for regression training experiments parameters +EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data'] +## Columns for historical prediction experiments parameters +EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources'] +## Job Features +# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing +FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"] +#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets'] +## Lag features used in regression +LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev'] +## The value we want to predict +TARGET_LABEL='total_power_max_watts' + +SVR_TO_LINEARSVR_THRESHOLD = 10000 + +# Define the parameter grid +param_grid = { + 'SVR':{ + 'svr__C': np.logspace(-3, 3, 7), + 'svr__gamma': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearSVR': { + 'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'RandomForestRegressor': { + 'randomforestregressor__n_estimators': [10, 50, 100, 200], + 'randomforestregressor__max_depth': [None, 10, 20, 30], + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearRegression': { + #'linearregression__': {}, + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'SGDRegressor': { + 'sgdregressor__alpha': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + } +} + +## Create SKlearn pipeline according to the model +def create_pipeline(model_conf): + model_obj = model_conf["value"](**model_conf["kwargs"]) + pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj) + return pipeline + +# Function to convert time strings to total seconds +def time_str_to_seconds(time_str): + #print(time_str) + components = time_str.split('-') + + if len(components) == 1: # No days component + time_parts = time_str.split(':') + if len(time_parts) == 1: # Format M:SS + time_str = "0:" + time_str + elif len(time_parts) == 2: # Format MM:SS + time_str = "00:" + time_str + return pd.to_timedelta(time_str).total_seconds() + elif len(components) == 2: # Format DD-HH:MM:SS + days, time_part = components + time_seconds = pd.to_timedelta(time_part).total_seconds() + return int(days) * 24 * 3600 + time_seconds + else: + return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months + +""" +Read and prepare data to use in the prediction +""" +def read_data(jobfile, user): + ## Holds the first job of each user (key=user_id) + dct_users_first_job_timestamp={} + + ## Holds a dataframe of jobs for each user (key=user_id) + jobs_users={} + + ## Read jobfile + df_jobs = pd.read_csv(jobfile, low_memory=False) + + ## Convert datetime columns to integer timestamps in seconds + df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds) + + lst_users = [user] + + ## Sort jobs by submission time + df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True) + ## Get (back) datetime objects + df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s') + df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour + + ## Initialize new columns + df_jobs['user_submission_week']=np.nan + df_jobs['lag_total_power_mean']=np.nan + df_jobs['lag_total_power_std']=np.nan + df_jobs['lag_total_power_prev']=np.nan + df_jobs['hist_pred_total_power_max']=np.nan + #df_jobs['reg_pred_total_power_mean']=np.nan + col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES] + _ = [df_jobs.assign(x=np.nan) for x in col_names] + + ## Populate jobs_users and dct_users_first_job_timestamp + ## Also computing the week number of the job submission + for user in lst_users: + jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index() + dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0] + first_job_timestamp = dct_users_first_job_timestamp[user] + jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK + jobs_users[user]=jobs_user + #print(user, len(jobs_users[user])) + #print(jobs_user) + #break + + return lst_users, jobs_users, dct_users_first_job_timestamp + +""" +Run prediction experiments +""" +def run_experiments(hist_design, + reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=None): + + ## Initialize the dict for the week + dct_users_jobs[user] = {} + + ## Get the jobs dataframe of the user + jobs_user = jobs_users[user] + + ## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0]) + jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list() + + + ## Get te timestamp of the user's first job + first_job_timestamp = dct_users_first_job_timestamp[user] + + ## Run the prediction routine for each week the user submitted jobs + for week in jobs_weeks: + ## Get the dataframe of user's jobs at week + user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy() + + ## for each job in week + for index, job in user_jobs_week.iterrows(): + if index == 0: + continue + else: + ################################################################## + ### Weighted Mean of up previous finished jobs in the jobs history + ### add the mean power of previous user's jobs + ### (weighted history approach) as features + ################################################################## + + ## Submission timestamp of the job + job_subm_time = job['submit_time'] + + ## Timestamp where the history will start + history_start = first_job_timestamp + + ## History window size (start counting since the first user job submission) + history_size = job_subm_time - history_start + + ## Get only jobs that finished before the row submission time + prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True) + + ## Further filter jobs if same_nb_resources (evals to False by default) + if hist_design['same_nb_resources'].values[0] == True: + prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']] + + if len(prev_jobs) < 1: + ## No job has finished before job_subm_time + continue + else: + ## Compute the standard deviation of the jobs in the history (lag feature) + user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs[TARGET_LABEL].std() + + ## Assign weights for the previous jobs in the history according to the power_factor + prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size) + prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0]) + + ## Compute the mean of the jobs in the history (lag feature) + user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs[TARGET_LABEL]*prev_jobs['weight']).sum()/prev_jobs['weight'].sum() + + ## Compute the mean of the jobs in the history (historical prediction result) + user_jobs_week.loc[index,'hist_pred_total_power_max'] = user_jobs_week.loc[index,'lag_total_power_mean'] + + ## Get the power metric of the most recent user job (lag feature, not used by default) + user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1][TARGET_LABEL] + + ## Add the user's week jobs in the dict + dct_users_jobs[user][week]=user_jobs_week + + prev_weeks=[] + + #################################################################################### + ### Regression multi-model power prediction + ### For each week >= 1 use the first 2/3 of the user's finished job history as train + ### and the remaining 1/3 as test + ### Run regression fitting for each of the models in MODELS + ### Select the best regression model regarding the test mean squared error (MSE) + ### Compare with the MSE of the historic prediction on the test + ### For week+1 use the model (regression or historic) with the lowest MSE + #################################################################################### + for i, week in enumerate(jobs_weeks): + + ## We skip the first week for regression + if i == 0: + prev_weeks.append(week) + continue + + ## Get the user's jobs at current week (only jobs with historic data prediction) + user_jobs_week=dct_users_jobs[user][week] + + ## Get all previous jobs for regression fitting + model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS) + + ## Add week in the list of previous weeks to get data + ### This will be used at the next week iteration + prev_weeks.append(week) + + ##DEBUG + #print(len(model_jobs_user)) + + regression=False + + ## Iterate over the experiments parameters (which vary the models) + for index, exp in reg_design.iterrows(): + ## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default) + if len(model_jobs_user) >= exp['min_reg_nb_jobs']: + regression=True + + ## Append result data to the experiments parameters dict + dct_result = exp.to_dict() + dct_result['user_id']=user + dct_result['user_week']=week + dct_result['hist_power_factor']=hist_design['power_factor'].values[0] + dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0] + + ## Select features and target label + X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS] + y=model_jobs_user[TARGET_LABEL] + + ## Train test split + ## Test split is important if we want to switch between algortihms at each week + #X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False) + + ## With this we only evaluate the training performance. + ## This is ok, since X, y are jobs from previous weeks. + ## We "test" the models at the current week, which is not present in X, y + X_train, y_train = X, y + + ## exp['percentage_training_data'] evals to 0 -> nothing changes + X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)] + y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)] + + ## Create an ML pipeline of standard scaling -> model and then train the model + + pipeline = create_pipeline(exp["model"]) + + #print("Week ", week, "Model: ", model, "Train Size: ", len(X_train)) + + reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train) + + ## Append result data to the experiments parameters dict + dct_result['model_name']=exp['model']['key'] + + best_predictor = reg.best_estimator_ + dct_result['model_object']=best_predictor + + ## Predict on the test dataset + test_predictions=best_predictor.predict(X_train) + + ## Append result data to the experiments parameters dict + dct_result['model']=exp['model']['key'] + + # Access the selected features using the RFE estimator from the best model + best_rfe = reg.best_estimator_.named_steps['rfe'] + selected_features = np.where(best_rfe.support_)[0] + + cols = FEATURE_LABELS+LAG_FEATURE_LABELS + feature_names = [cols[x] for x in selected_features] + dct_result['features']=feature_names + #print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names) + dct_result['reg_total_power_max_mape']=mean_absolute_percentage_error(y_train, test_predictions) + dct_result['reg_total_power_max_mae']=mean_absolute_error(y_train, test_predictions) + dct_result['reg_total_power_max_mse']=mean_squared_error(y_train, test_predictions) + + print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_max_mape'], "MAE: ", dct_result['reg_total_power_max_mae'], "MSE: ", dct_result['reg_total_power_max_mse']) + ### Results of the historic data power prediction + dct_result['hist_total_power_max_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_max_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean']) + + ## Append to the global list of training results + lst_results.append(dct_result) + + ## If regression training was performed using prev_weeks, select the best model for week + if regression == True: + + ## Get the week prediction results from the global list lst_results + df_week_results=pd.DataFrame(lst_results) + df_week_results=df_week_results.loc[df_week_results['user_week']==week] + + #print(df_week_results) + + ## Predict for week using all models + for _, result in df_week_results.iterrows(): + model_name = result['model_name'] + model_object = result['model_object'] + + col_label = model_name+"_"+TARGET_LABEL + user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS]) + + lst_predicted_jobs.append(user_jobs_week) + + return lst_predicted_jobs + +""" +Start prediction routine +""" +def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp): + # Construct the design of experiments for the prediction + ## Historical prediction design + ## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False) + design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])] + + ## Regression design + ## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0] + ## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored + design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA)) + + df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS) + df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS) + + ## Holds the list of regression results (TO VERIFY) + lst_results=[] + ## Holds the list of predicted jobs + ## Each position is a dataframe of predicted jobs of a user + lst_predicted_jobs=[] + ## Holds the weekly job submission for each user + ## Key is [user][week] + dct_users_jobs={} + + #for index, exp in df_ff_design.iterrows(): + for user in lst_users: + #print('user: ', user) + dct_users_jobs[user] = {} + lst_predicted_jobs = run_experiments(df_hist_design, + df_reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=user) + #print(str(index) + ' of ' + str(len(df_ff_design)), end='\r') + + df_predicted_jobs = [] + + if lst_predicted_jobs: + df_predicted_jobs=pd.concat(lst_predicted_jobs) + + if len(df_predicted_jobs) != 0: + df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_max_watts','hist_pred_total_power_max']) + + ## Print results stats, no longer needed + """ + for user in lst_users: + #print('user: ', user) + df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user] + print('number of predicted jobs: ', len(df_predicted_user)) + if len(df_predicted_user) == 0: + continue + df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history'] + if(len(df_predicted_user_hist) > 0): + print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean'])) + #df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean') + df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression'] + if(len(df_predicted_user_reg) > 0): + print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean'])) + print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + """ + + return df_predicted_jobs + +""" +Save predicted jobs data to csv +""" +def save_results(df_predicted_jobs, outputfile): + if len(df_predicted_jobs) == 0: + return + else: + df_predicted_jobs.to_csv(outputfile, index=False) + +""" +Run workflow +""" +def run_workflow(jobfile, outputfile, user): + lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user) + df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp) + save_results(df_predicted_jobs, outputfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outputfile", "-o", type=str, required=True, + help="Output file name (optionally with global path)") + p.add_argument("--user", "-u", type=int, required=True, + help="User ID") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.jobfile, args.outputfile, args.user) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py b/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py new file mode 100644 index 0000000000000000000000000000000000000000..269e3116930ac8ce90914cbb30dc4d532d83f069 --- /dev/null +++ b/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 +import sys +import argparse +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor +from sklearn.svm import SVR, LinearSVR +from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.model_selection import train_test_split, GridSearchCV +from sklearn.feature_selection import RFE +from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error +from itertools import product + +""" +Initial variables +""" +## Time constants +NANOSECONDS_ONE_SECOND = 1e9 +SEC_ONE_MINUTE=60 +SEC_ONE_HOUR=60*SEC_ONE_MINUTE +SEC_ONE_DAY=24*SEC_ONE_HOUR +SEC_ONE_WEEK=7*SEC_ONE_DAY + +# Experiments parameters +## values for job submission time difference penalization +POWER_FACTORS={"sublinear": 0.5, + "linear": 1.0, + "quadratic": 2.0} +## Predict using only jobs with the same nb of resources +SAME_NB_RESOURCES={"yes": True, + "no": False} + + +## List of models used in the regression +MODELS=[ + {'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}}, + {'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}}, + {'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}}, + {'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}}, +] +MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor'] + +## train model only if the user's number of jobs is at least MIN_NB_JOBS +MIN_NB_JOBS=[10] +## Further reduce train set size (used for testing, default must be 0.0) +## For info: regression train/test split is 0.66 train and 0.33 test +#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0] +PERCENTAGE_TRAINING_DATA=[0.0] +## Columns for regression training experiments parameters +EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data'] +## Columns for historical prediction experiments parameters +EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources'] +## Job Features +# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing +FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"] +#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets'] +## Lag features used in regression +LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev'] +## The value we want to predict +TARGET_LABEL='total_power_mean_watts' + +SVR_TO_LINEARSVR_THRESHOLD = 10000 + +# Define the parameter grid +param_grid = { + 'SVR':{ + 'svr__C': np.logspace(-3, 3, 7), + 'svr__gamma': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearSVR': { + 'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'RandomForestRegressor': { + 'randomforestregressor__n_estimators': [10, 50, 100, 200], + 'randomforestregressor__max_depth': [None, 10, 20, 30], + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearRegression': { + #'linearregression__': {}, + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'SGDRegressor': { + 'sgdregressor__alpha': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + } +} + +## Create SKlearn pipeline according to the model +def create_pipeline(model_conf): + model_obj = model_conf["value"](**model_conf["kwargs"]) + pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj) + return pipeline + +# Function to convert time strings to total seconds +def time_str_to_seconds(time_str): + #print(time_str) + components = time_str.split('-') + + if len(components) == 1: # No days component + time_parts = time_str.split(':') + if len(time_parts) == 1: # Format M:SS + time_str = "0:" + time_str + elif len(time_parts) == 2: # Format MM:SS + time_str = "00:" + time_str + return pd.to_timedelta(time_str).total_seconds() + elif len(components) == 2: # Format DD-HH:MM:SS + days, time_part = components + time_seconds = pd.to_timedelta(time_part).total_seconds() + return int(days) * 24 * 3600 + time_seconds + else: + return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months + +""" +Read and prepare data to use in the prediction +""" +def read_data(jobfile, user): + ## Holds the first job of each user (key=user_id) + dct_users_first_job_timestamp={} + + ## Holds a dataframe of jobs for each user (key=user_id) + jobs_users={} + + ## Read jobfile + df_jobs = pd.read_csv(jobfile, low_memory=False) + + ## Convert datetime columns to integer timestamps in seconds + df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds) + + lst_users = [user] + + ## Sort jobs by submission time + df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True) + ## Get (back) datetime objects + df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s') + df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour + + ## Initialize new columns + df_jobs['user_submission_week']=np.nan + df_jobs['lag_total_power_mean']=np.nan + df_jobs['lag_total_power_std']=np.nan + df_jobs['lag_total_power_prev']=np.nan + df_jobs['hist_pred_total_power_mean']=np.nan + #df_jobs['reg_pred_total_power_mean']=np.nan + col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES] + _ = [df_jobs.assign(x=np.nan) for x in col_names] + + ## Populate jobs_users and dct_users_first_job_timestamp + ## Also computing the week number of the job submission + for user in lst_users: + jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index() + dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0] + first_job_timestamp = dct_users_first_job_timestamp[user] + jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK + jobs_users[user]=jobs_user + #print(user, len(jobs_users[user])) + #print(jobs_user) + #break + + return lst_users, jobs_users, dct_users_first_job_timestamp + +""" +Run prediction experiments +""" +def run_experiments(hist_design, + reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=None): + + ## Initialize the dict for the week + dct_users_jobs[user] = {} + + ## Get the jobs dataframe of the user + jobs_user = jobs_users[user] + + ## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0]) + jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list() + + + ## Get te timestamp of the user's first job + first_job_timestamp = dct_users_first_job_timestamp[user] + + ## Run the prediction routine for each week the user submitted jobs + for week in jobs_weeks: + ## Get the dataframe of user's jobs at week + user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy() + + ## for each job in week + for index, job in user_jobs_week.iterrows(): + if index == 0: + continue + else: + ################################################################## + ### Weighted Mean of up previous finished jobs in the jobs history + ### add the mean power of previous user's jobs + ### (weighted history approach) as features + ################################################################## + + ## Submission timestamp of the job + job_subm_time = job['submit_time'] + + ## Timestamp where the history will start + history_start = first_job_timestamp + + ## History window size (start counting since the first user job submission) + history_size = job_subm_time - history_start + + ## Get only jobs that finished before the row submission time + prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True) + + ## Further filter jobs if same_nb_resources (evals to False by default) + if hist_design['same_nb_resources'].values[0] == True: + prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']] + + if len(prev_jobs) < 1: + ## No job has finished before job_subm_time + continue + else: + ## Compute the standard deviation of the jobs in the history (lag feature) + user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs['total_power_mean_watts'].std() + + ## Assign weights for the previous jobs in the history according to the power_factor + prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size) + prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0]) + + ## Compute the mean of the jobs in the history (lag feature) + user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs['total_power_mean_watts']*prev_jobs['weight']).sum()/prev_jobs['weight'].sum() + + ## Compute the mean of the jobs in the history (historical prediction result) + user_jobs_week.loc[index,'hist_pred_total_power_mean'] = user_jobs_week.loc[index,'lag_total_power_mean'] + + ## Get the power metric of the most recent user job (lag feature, not used by default) + user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1]['total_power_mean_watts'] + + ## Add the user's week jobs in the dict + dct_users_jobs[user][week]=user_jobs_week + + prev_weeks=[] + + #################################################################################### + ### Regression multi-model power prediction + ### For each week >= 1 use the first 2/3 of the user's finished job history as train + ### and the remaining 1/3 as test + ### Run regression fitting for each of the models in MODELS + ### Select the best regression model regarding the test mean squared error (MSE) + ### Compare with the MSE of the historic prediction on the test + ### For week+1 use the model (regression or historic) with the lowest MSE + #################################################################################### + for i, week in enumerate(jobs_weeks): + + ## We skip the first week for regression + if i == 0: + prev_weeks.append(week) + continue + + ## Get the user's jobs at current week (only jobs with historic data prediction) + user_jobs_week=dct_users_jobs[user][week] + + ## Get all previous jobs for regression fitting + model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS) + + ## Add week in the list of previous weeks to get data + ### This will be used at the next week iteration + prev_weeks.append(week) + + ##DEBUG + #print(len(model_jobs_user)) + + regression=False + + ## Iterate over the experiments parameters (which vary the models) + for index, exp in reg_design.iterrows(): + ## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default) + if len(model_jobs_user) >= exp['min_reg_nb_jobs']: + regression=True + + ## Append result data to the experiments parameters dict + dct_result = exp.to_dict() + dct_result['user_id']=user + dct_result['user_week']=week + dct_result['hist_power_factor']=hist_design['power_factor'].values[0] + dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0] + + ## Select features and target label + X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS] + y=model_jobs_user[TARGET_LABEL] + + ## Train test split + ## Test split is important if we want to switch between algortihms at each week + #X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False) + + ## With this we only evaluate the training performance. + ## This is ok, since X, y are jobs from previous weeks. + ## We "test" the models at the current week, which is not present in X, y + X_train, y_train = X, y + + ## exp['percentage_training_data'] evals to 0 -> nothing changes + X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)] + y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)] + + ## Create an ML pipeline of standard scaling -> model and then train the model + + pipeline = create_pipeline(exp["model"]) + + #print("Week ", week, "Model: ", model, "Train Size: ", len(X_train)) + + reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train) + + ## Append result data to the experiments parameters dict + dct_result['model_name']=exp['model']['key'] + #dct_result['model_name']=model + + best_predictor = reg.best_estimator_ + dct_result['model_object']=best_predictor + + ## Predict on the train dataset + test_predictions=best_predictor.predict(X_train) + + ## Append result data to the experiments parameters dict + dct_result['model']=exp['model']['key'] + + # Access the selected features using the RFE estimator from the best model + best_rfe = reg.best_estimator_.named_steps['rfe'] + selected_features = np.where(best_rfe.support_)[0] + + cols = FEATURE_LABELS+LAG_FEATURE_LABELS + feature_names = [cols[x] for x in selected_features] + dct_result['features']=feature_names + #print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names) + dct_result['reg_total_power_mean_mape']=mean_absolute_percentage_error(y_train, test_predictions) + dct_result['reg_total_power_mean_mae']=mean_absolute_error(y_train, test_predictions) + dct_result['reg_total_power_mean_mse']=mean_squared_error(y_train, test_predictions) + print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_mean_mape'], "MAE: ", dct_result['reg_total_power_mean_mae'], "MSE: ", dct_result['reg_total_power_mean_mse']) + + ### Results of the historic data power prediction + dct_result['hist_total_power_mean_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_mean_mae']=mean_absolute_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_mean_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean']) + #print("User: ", user, "Week ", week, "Model: ", "History", "MAPE: ", dct_result['hist_total_power_mean_mape'], "MAE: ", dct_result['hist_total_power_mean_mae'], "MSE: ", dct_result['hist_total_power_mean_mse']) + + ## Append to the global list of training results + lst_results.append(dct_result) + + ## If regression training was performed using prev_weeks, select the best model for week + if regression == True: + + ## Get the week prediction results from the global list lst_results + df_week_results=pd.DataFrame(lst_results) + df_week_results=df_week_results.loc[df_week_results['user_week']==week] + + #print(df_week_results) + + ## Predict for week using all models + for _, result in df_week_results.iterrows(): + model_name = result['model_name'] + model_object = result['model_object'] + + col_label = model_name+"_"+TARGET_LABEL + user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS]) + + lst_predicted_jobs.append(user_jobs_week) + + return lst_predicted_jobs + +""" +Start prediction routine +""" +def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp): + # Construct the design of experiments for the prediction + ## Historical prediction design + ## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False) + design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])] + + ## Regression design + ## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0] + ## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored + design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA)) + + df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS) + df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS) + #print(len(df_ff_design)) + + ## Holds the list of regression results (TO VERIFY) + lst_results=[] + ## Holds the list of predicted jobs + ## Each position is a dataframe of predicted jobs of a user + lst_predicted_jobs=[] + ## Holds the weekly job submission for each user + ## Key is [user][week] + dct_users_jobs={} + + #for index, exp in df_ff_design.iterrows(): + for user in lst_users: + #print('user: ', user) + dct_users_jobs[user] = {} + lst_predicted_jobs = run_experiments(df_hist_design, + df_reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=user) + #print(str(index) + ' of ' + str(len(df_ff_design)), end='\r') + + df_predicted_jobs = [] + + if lst_predicted_jobs: + df_predicted_jobs=pd.concat(lst_predicted_jobs) + + if len(df_predicted_jobs) != 0: + df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_mean_watts','hist_pred_total_power_mean']) + + ## Print results stats, no longer needed + """ + for user in lst_users: + #print('user: ', user) + df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user] + print('number of predicted jobs: ', len(df_predicted_user)) + if len(df_predicted_user) == 0: + continue + df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history'] + if(len(df_predicted_user_hist) > 0): + print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean'])) + #df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean') + df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression'] + if(len(df_predicted_user_reg) > 0): + print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean'])) + print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + """ + + return df_predicted_jobs + +""" +Save predicted jobs data to csv +""" +def save_results(df_predicted_jobs, outputfile): + if len(df_predicted_jobs) == 0: + return + else: + df_predicted_jobs.to_csv(outputfile, index=False) + +""" +Run workflow +""" +def run_workflow(jobfile, outputfile, user): + lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user) + df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp) + save_results(df_predicted_jobs, outputfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outputfile", "-o", type=str, required=True, + help="Output file name (optionally with global path)") + p.add_argument("--user", "-u", type=int, required=True, + help="User ID") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.jobfile, args.outputfile, args.user) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py new file mode 100644 index 0000000000000000000000000000000000000000..0074f737b0b643dca11054b0bb4ff637d1fafaa9 --- /dev/null +++ b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py @@ -0,0 +1,48 @@ +import pandas as pd +import subprocess +import sys +import argparse +import os +import functools +from multiprocessing import Pool + +""" +Run Workflow +""" +def run_workflow(jobfile, outdir): + # Load the dataframe with "user_id" column + df = pd.read_csv(jobfile) + + # Iterate over each user_id and run predict_jobs_power_svr.py + for user_id in df['user_id'].drop_duplicates(): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + # Skip user if prediction was already performed + if os.path.isfile(outfile) == True: + continue + command = ['predict-jobs-power-allmethods-max', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outdir", "-o", type=str, required=True, + help="Directory of the output files") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.jobfile, args.outdir) + +if __name__ == '__main__': + main() diff --git a/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py new file mode 100644 index 0000000000000000000000000000000000000000..7d90605a2e2b36d96b95afffc0708c6a1dbd81c7 --- /dev/null +++ b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py @@ -0,0 +1,48 @@ +import pandas as pd +import subprocess +import sys +import argparse +import os +import functools +from multiprocessing import Pool + +""" +Run Workflow +""" +def run_workflow(jobfile, outdir): + # Load the dataframe with "user_id" column + df = pd.read_csv(jobfile) + + # Iterate over each user_id and run predict_jobs_power_svr.py + for user_id in df['user_id'].drop_duplicates(): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + # Skip user if prediction was already performed + if os.path.isfile(outfile) == True: + continue + command = ['predict-jobs-power-allmethods-mean', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outdir", "-o", type=str, required=True, + help="Directory of the output files") + + return(p.parse_args()) + +def main(): + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + args = read_cli() + run_workflow(args.jobfile, args.outdir) + +if __name__ == '__main__': + main() diff --git a/scripts-py/pyproject.toml b/scripts-py/pyproject.toml index 486a81b1d512dc121614a1b500ca30e3f1f3b764..8bf3f99b5a9d417b8fa99f153641dfe64a8ef753 100644 --- a/scripts-py/pyproject.toml +++ b/scripts-py/pyproject.toml @@ -11,6 +11,14 @@ version = "0.1.0" [project.scripts] m100-data-downloader = "expe_energumen.m100_data_downloader:main" m100-agg-power-months = "expe_energumen.m100_agg_month_power_values:several_months" +m100-pred-preprocess1 = "expe_energumen.m100_pred_preprocess_1:main" +m100-pred-preprocess2 = "expe_energumen.m100_pred_preprocess_2:main" +m100-pred-jobs-extract-power-metrics = "expe_energumen.m100_pred_jobs_extract_power_metrics:main" +m100-pred-merge-jobfiles = "expe_energumen.m100_pred_merge_jobfiles:main" +run-prediction-per-user-allmethods-mean = "expe_energumen.run_prediction_per_user_allmethods_mean:main" +run-prediction-per-user-allmethods-max = "expe_energumen.run_prediction_per_user_allmethods_max:main" +predict-jobs-power-allmethods-mean = "expe_energumen.predict_jobs_power_allmethods_mean:main" +predict-jobs-power-allmethods-max = "expe_energumen.predict_jobs_power_allmethods_max:main" m100-agg-power-predictions = "expe_energumen.m100_agg_power_predictions:agg_all_files" m100-agg-jobs-info = "expe_energumen.m100_agg_jobs_info:several_months" m100-join-usable-jobs-info = "expe_energumen.m100_join_usable_jobs_info:join"