diff --git a/artifact-overview.typ b/artifact-overview.typ index 5a027b6e9549322c34c1f897cc8a8e7c607e31c8..b011c6ccf0b2a8784026c577097c109c947535c9 100644 --- a/artifact-overview.typ +++ b/artifact-overview.typ @@ -294,19 +294,6 @@ The step-by-step instructions of this document can be used in several ways *depe You can follow all steps below in this case, but *please do note that this is disk/bandwidth/computation-intensive.* -== Job power prediction <sec-job-power-pred> -How to reproduce the power predictions of jobs has not been written yet. -The expected output data of this section has however been stored on #link(zenodo-url)[Zenodo]. - -//#tododanilo[how to reproduce this experiment?] - -#fullbox(footer: [Disk: 82 Mo.])[ - #filehashes(( - "fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz", - "954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz", - )) -] - == Analysis and modeling of the power behavior of Marconi100 nodes === Get power and job Marconi100 traces on your disk <sec-m100-power-job-traces> This section downloads parts of the Marconi100 trace as archives from #link("https://gitlab.com/ecs-lab/exadata")[the ExaData Zenodo files], checks that the archives have the right content (via MD5 checksums), extracts the data needed by later stages of the pipeline (node power usage traces, jobs information traces), then finally removes unneeded extracted files and the downloaded archives. @@ -381,6 +368,258 @@ Required input files. Their content should be completely reproducible though. ] +== Job power prediction <sec-job-power-pred> + +The experimental workflow consists of three parts, (i) preprocessing of the original data, and +(ii) prediction of the mean and maximum power consumption. + +=== Pre-processing + +==== Step 1 + +#tododanilo[in the script source-code: change output filename of step 1 +from a_0_filter12_singlenode.csv and from a_0_filter12_multinode.csv +to 22-0X_filter12_singlenode.csv and 22-0X_filter12_multinode.csv] + +#fullbox(footer:[Memory: 128 Go. Time (sequential): 18:00:00])[ +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-01_jobs.parquet \ + -p m100-data/22-01_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-02_jobs.parquet \ + -p m100-data/22-02_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-03_jobs.parquet \ + -p m100-data/22-03_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-04_jobs.parquet \ + -p m100-data/22-04_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-05_jobs.parquet \ + -p m100-data/22-05_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-06_jobs.parquet \ + -p m100-data/22-06_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-07_jobs.parquet \ + -p m100-data/22-07_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-08_jobs.parquet \ + -p m100-data/22-08_power_total.parquet + ``` +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_1.py \ + -j ../m100-data/22-09_jobs.parquet \ + -p m100-data/22-09_power_total.parquet + ``` +] + +=== Step 2 +#tododanilo[in the script source-code: change output filename of step 2 +from a_0_filter123_singlenode.csv and from a_0_filter123_multinode.csv +to 22-0X_filter123_singlenode.csv and 22-0X_filter123_multinode.csv] + +#fullbox(footer:[Memory: 128 Go. Time (sequential): 66:00:00])[ +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-01_filter12_singlenode.csv \ + -jm ./m100-data/22-01_filter12_multinode.csv + -p m100-data/22-01_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-02_filter12_singlenode.csv \ + -jm ./m100-data/22-02_filter12_multinode.csv + -p m100-data/22-02_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-03_filter12_singlenode.csv \ + -jm ./m100-data/22-03_filter12_multinode.csv + -p m100-data/22-03_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-04_filter12_singlenode.csv \ + -jm ./m100-data/22-04_filter12_multinode.csv + -p m100-data/22-04_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-05_filter12_singlenode.csv \ + -jm ./m100-data/22-05_filter12_multinode.csv + -p m100-data/22-05_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-06_filter12_singlenode.csv \ + -jm ./m100-data/22-06_filter12_multinode.csv + -p m100-data/22-06_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-07_filter12_singlenode.csv \ + -jm ./m100-data/22-07_filter12_multinode.csv + -p m100-data/22-07_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-08_filter12_singlenode.csv \ + -jm ./m100-data/22-08_filter12_multinode.csv + -p m100-data/22-08_power_total.parquet + ``` + +```python + ./scripts-py/expe_energumen/m100_pred_preprocess_2.py \ + -js ./m100-data/22-09_filter12_singlenode.csv \ + -jm ./m100-data/22-09_filter12_multinode.csv + -p m100-data/22-09_power_total.parquet + ``` + +] + +=== Aggregate step 2 output into a single file + +#fullbox(footer: [Disk: 32 Go.])[ +find . -name '*filter123*' | tar -zcvf exadata_job_energy_profiles.tar.gz --files-from - +] + +=== Compute power metrics and add job information + +#tododanilo[Script source-code: change -d (dir path) and pass the path to the necessary files] + +#fullbox(footer: [Disk: 32 Go.])[ +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-01 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-02 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-03 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-04 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-05 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-06 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-07 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-08 +``` + +``` python + ./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \ + -d ./data/year_month=22-09 +``` +] + +=== Merge files into a single CSV file + +This will output the `filter123_all_jobs_aggmetrics.csv.gz` needed for the prediction script + +#tododanilo[check if /m100-data/ path is correct and also the path of the output] + +#fullbox(footer: [Disk: 82 Mo.])[ + +``` python + ./scripts-py/expe_energumen/m100_pred_merge_jobfiles.py -d ./m100-data/ +``` +] + +== Predicting Job mean and maximum power consumption + +#fullbox(footer:[Memory: 128 Go. Time (sequential): 72:00:00])[ +``` +mkdir ./m100-data/total_power_mean_predictions_users_allmethods_mean +mkdir ./m100-data/total_power_mean_predictions_users_allmethods_max + +python ./scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py \ + -j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \ + -o ./m100-data/total_power_mean_predictions_users_allmethods_mean + +python ./scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py \ + -j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \ + -o ./m100-data/total_power_mean_predictions_users_allmethods_max +``` +] + +=== Compressing prediction output into single files + +#fullbox(footer:[Disk: 82 Mo.])[ +``` +tar -cvzf ./m100-data/power_pred_users_allmethods_max.tar.gz \ + ./m100-data/total_power_mean_predictions_users_allmethods_mean +tar -cvzf ./m100-data/power_pred_users_allmethods_mean.tar.gz \ + ./m100-data/total_power_mean_predictions_users_allmethods_max +``` +] + +The expected output data of has been stored on #link(zenodo-url)[Zenodo]. + +//#tododanilo[how to reproduce this experiment?] + +#fullbox(footer: [Disk: 82 Mo.])[ + #filehashes(( + "fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz", + "954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz", + )) +] + +== Analyzing prediction results + +=== Required data +Output from the previous section + +- `m100-data/power_pred_users_allmethods_mean.tar.gz`, the jobs mean power predictions. +- `m100-data/power_pred_users_allmethods_max.tar.gz`, the jobs maximum power predictions. + +#tododanilo[Add notebook that make plots] + + == Job scheduling with power prediction <sec-sched> This section shows how to reproduce Sections 6.4 and 6.5 of article @lightpredenergy. diff --git a/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py b/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..0ae9b2edf1f84d1b98a0d8dd38ccd716a99dede9 --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py @@ -0,0 +1,79 @@ +import argparse +import sys +import pandas as pd +import numpy as np +from scipy.stats import iqr + +""" +Read input data +""" +def read_data(rootdir): + df_jobs_single = pd.read_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_singlenode.csv") + df_jobs_multi = pd.read_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_multinode.csv") + df_power_single = pd.read_csv(rootdir+"/plugin=ipmi_pub/metric=total_power/a_0_filter123_singlenode.csv") + df_power_multi = pd.read_csv(rootdir+"/plugin=ipmi_pub/metric=total_power/a_0_filter123_multinode.csv") + df_jobs = pd.concat([df_jobs_single, df_jobs_multi]).reset_index(drop=True) + df_power = pd.concat([df_power_single, df_power_multi]).reset_index(drop=True) + df_power['node'] = pd.to_numeric(df_power['node']) + df_power['value'] = pd.to_numeric(df_power['value']) + return df_jobs, df_power + + +""" +Calculate jobs' power aggregation metrics +""" +def calculate_agg_metrics(df_jobs, df_power): + powertrace_jobs = [df_power[df_power["job_id"]==x]['value'].values for x in df_jobs['job_id'].values] + df_jobs["total_power_max_watts"] = [x.max() for x in powertrace_jobs] + df_jobs["total_power_mean_watts"] = [x.mean() for x in powertrace_jobs] + df_jobs["total_power_median_watts"] = [np.median(x) for x in powertrace_jobs] + df_jobs["total_power_std_watts"] = [x.std() for x in powertrace_jobs] + df_jobs["total_power_iqr_watts"] = [iqr(x, rng=(25, 75)) for x in powertrace_jobs] + df_jobs["total_power_1090range_watts"] = [iqr(x, rng=(10, 90)) for x in powertrace_jobs] + return df_jobs + + +""" +Save results +""" +def save_results(df_jobs_aggmetrics, rootdir): + df_jobs_aggmetrics.to_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_aggmetrics.csv") + +""" +Run workflow +""" +def run_workflow(rootdir): + df_jobs, df_power = read_data(rootdir) + df_jobs_aggmetrics = calculate_agg_metrics(df_jobs, df_power) + print(df_jobs_aggmetrics[["run_time", + "num_nodes", + "total_power_max_watts", + "total_power_mean_watts", + "total_power_median_watts"]]) + save_results(df_jobs_aggmetrics, rootdir) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job power metrics. This script uses the output from process_marconi_jobs_2.py') + + p.add_argument("--rootdir", "-d", type=str, required=True, + help="Root directory of the trace") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.rootdir) \ No newline at end of file diff --git a/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py b/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py new file mode 100644 index 0000000000000000000000000000000000000000..3d3344681c3e4bed4bf25d7e7e087b14acb1855b --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_merge_jobfiles.py @@ -0,0 +1,58 @@ +import glob +import pandas as pd +import sys +import argparse + +""" +Read job files spread in the months folders +""" +def read_jobifles(rootdir): + #DATASET_PATH = "/home/dancarastan/Documentos/exadata_job_energy_profiles/" + + jobfiles_list = glob.glob(rootdir+"*"+"/plugin=job_table"+"/metric=job_info_marconi100"+"/a_0_filter123_aggmetrics.csv") + + #print(len(jobfiles_list)) + df_jobs = pd.concat([pd.read_csv(jobfile) for jobfile in jobfiles_list]).reset_index(drop=True) + return df_jobs + + +""" +Save results to compressed csv +""" +def save_results(df_jobs, rootdir): + df_jobs.to_csv(rootdir+"/filter123_all_jobs_aggmetrics.csv.gz", index=False) + + +""" +Run workflow +""" +def run_workflow(rootdir): + df_jobs = read_jobifles(rootdir) + save_results(df_jobs, rootdir) + + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Concatenate job table files into a single csv file') + + p.add_argument("--rootdir", "-d", type=str, required=True, + help="Root directory of the trace") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.rootdir) \ No newline at end of file diff --git a/scripts-py/expe_energumen/m100_pred_preprocess_1.py b/scripts-py/expe_energumen/m100_pred_preprocess_1.py new file mode 100644 index 0000000000000000000000000000000000000000..182f632b4ae0af5aa12f23a0c70ae004cba27b97 --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_preprocess_1.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +import sys +import argparse +import ast +import pandas as pd +import numpy as np + +""" +Read Data +""" +def read_data(jobfile, metricfile): + df_jobs = pd.read_parquet(jobfile) + df_power = pd.read_parquet(metricfile) + ## I call here df_power, but in reality it can be any metric + return df_jobs, df_power + + +""" +Convert some string values to numeric +Convert timestamps to seconds for job runtime +""" +def preprocess_data(df_jobs, df_power): + df_power['node'] = pd.to_numeric(df_power['node']) + df_power['value'] = pd.to_numeric(df_power['value']) + df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's') + print("Total number of jobs: ", len(df_jobs)) + return df_jobs, df_power + + +""" +Filter 1: remove jobs that take less than a minute +""" +def filter1(df_jobs): + SECONDS_ONE_MINUTE = 60.0 + + df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE] + print("Number of jobs after filter 1: ", str(len(df_jobs_f1))) + return df_jobs_f1 + + +""" +Filter 2: remove jobs with no energy profile +Method for single-node jobs +""" +def filter2_single(df_jobs, df_power): + df_jobs_f1_single = df_jobs[df_jobs["num_nodes"]==1] + + #removing malformed "nodes" + df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True) + df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']] + + #use this for smaller inputs + #sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True) + + sample = df_jobs_f1_single + + group = df_power.groupby(by="node") + + available_nodes = group.groups.keys() + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that 0 is node, 1 is start_time, and 2 is end_time + result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) if x[0] in available_nodes else False for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + sample["has_profile"] = result + df_jobs_f12_single = sample[sample["has_profile"]==True] + + print("Number of jobs after filter 2 - single: ", str(len(df_jobs_f12_single))) + return df_jobs_f12_single + + +""" +Filter 2: remove jobs with no energy profile +Method for multi-node jobs +""" +def filter2_multi(df_jobs, df_power): + df_jobs_f1_multi = df_jobs[df_jobs["num_nodes"] > 1].copy().reset_index(drop=True) + + #removing malformed "nodes" + df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']] + + #use this for smaller inputs + #sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True) + + sample = df_jobs_f1_multi + + group = df_power.groupby(by="node") + + available_nodes = set(group.groups.keys()) + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that 0 is node, 1 is start_time, and 2 is end_time + result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) if set(x[0]).issubset(available_nodes) else False for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + sample["has_profile"] = result + df_jobs_f12_multi = sample[sample["has_profile"]==True] + print("Number of jobs after filter 2 - multi: ", str(len(df_jobs_f12_multi))) + return df_jobs_f12_multi + + +""" +Save intermediate results to csv +""" +def save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile): + jobfile_out = jobfile.rstrip("a_0.parquet") + metric = metricfile.split("/")[-2] + df_jobs_single.to_csv(jobfile_out+metric+"_filter12_singlenode.csv", index=False) + df_jobs_multi.to_csv(jobfile_out+metric+"_filter12_multinode.csv", index=False) + +""" +Run workflow +""" +def run_workflow(metricfile, jobfile): + df_jobs, df_power = read_data(jobfile=jobfile, metricfile=metricfile) + df_jobs, df_power = preprocess_data(df_jobs=df_jobs, df_power=df_power) + df_jobs = filter1(df_jobs) + df_jobs_single = filter2_single(df_jobs, df_power) + df_jobs_multi = filter2_multi(df_jobs, df_power) + save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles') + + p.add_argument("--metricfile", "-m", type=str, required=True, + help="Metric file") + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.metricfile, args.jobfile) \ No newline at end of file diff --git a/scripts-py/expe_energumen/m100_pred_preprocess_2.py b/scripts-py/expe_energumen/m100_pred_preprocess_2.py new file mode 100644 index 0000000000000000000000000000000000000000..33b3d2e142dbdc326c91e42c69179baa6ff43de6 --- /dev/null +++ b/scripts-py/expe_energumen/m100_pred_preprocess_2.py @@ -0,0 +1,246 @@ +import argparse +import sys +import ast +import multiprocessing +import functools +import numpy as np +import pandas as pd + +""" +Read data +Output from process_marconi_jobs_1.py +""" +def read_data(jobfile_single, jobfile_multi, metricfile): + df_jobs_single = pd.read_csv(jobfile_single) + df_jobs_multi = pd.read_csv(jobfile_multi) + + ## Here i refer to total_power but in reality it can be any metric + df_total_power = pd.read_parquet(metricfile) + df_total_power['node'] = pd.to_numeric(df_total_power['node']) + df_total_power['value'] = pd.to_numeric(df_total_power['value']) + return df_jobs_single, df_jobs_multi, df_total_power + + +""" +Filter 3: find the jobs that share a node +The idea is to add a new column to `df_total_power` +with a list of jobs that run on each node at each timestamp. +Then we filter jobs that appear sharing with another job +""" +def filter3_1_single(df_jobs_single, df_total_power): + #removing malformed "nodes" + df_jobs_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_single['nodes']] + + #use this for smaller inputs + #TEST_SAMPLE_SIZE = 1000 + + TEST_SAMPLE_SIZE = len(df_jobs_single) + sample = df_jobs_single[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True) + + #sample = df_jobs_single + + group = df_total_power.groupby(by="node") + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that x[0] is node, x[1] is start_time, and x[2] is end_time + #each element of the list is a subseries of the total_power of a certain mode + #with "1" at the timestamps where the "job_id" ran + result = [[group.get_group(x[0])["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str), x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()] + + # now i replace the "1"s with the "job_id" + #each element of the list is a subseries of the total_power of a certain mode + #with the "job_id" value at the timestamps where the "job_id" ran + result2 = [x[0].replace("1.0", str(x[1])) for x in result] + #print(result2) + + #finally i concatenate each of the series by index + # while joining the values where indices overlap + # i hope the indices here are consistent with the indices in df_total_power + #concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True) + concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids") + + joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner") + print(joined_total_power) + return joined_total_power + + +""" +Filter 3: find the jobs that not share a node +Based on filter3_single_1, return the list of job ids that +not shared a node with another job +""" +def filter3_2(joined_total_power): + all_job_ids=[] + _ = pd.Series([[all_job_ids.append(y) for y in x.split(",")] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()]) + + all_job_ids = pd.Series(all_job_ids).drop_duplicates() + + nodeshare_job_ids = [] + _ = pd.Series([[nodeshare_job_ids.append(y) for y in x.split(",") if len(x.split(",")) > 1] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()]) + nodeshare_job_ids = pd.Series(nodeshare_job_ids).drop_duplicates() + + exclusive_job_ids = all_job_ids[~all_job_ids.isin(nodeshare_job_ids)] + print("Nodeshare Job ratio: ", len(nodeshare_job_ids)/len(all_job_ids)) + print(exclusive_job_ids) + return exclusive_job_ids + +""" +Filter 3: find the jobs that not share a node +Getting the profile of the not nodeshared jobs +And details of the exclusive jobs +""" +def filter3_3(df_jobs_single, exclusive_job_ids, joined_total_power): + result = [x for x in joined_total_power.values if np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) == True] + df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"]) + df_exclusive_jobs = df_jobs_single[df_jobs_single["job_id"].isin(exclusive_job_ids.astype(int))] + return df_total_power_exclusive_single, df_exclusive_jobs + +""" +Filter 3: Define the function to check the condition in parallel +""" +def check_condition_f3(x, exclusive_job_ids=None): + return np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) + +""" +Filter 3: find the jobs that not share a node +Getting the profile of the not nodeshared jobs +And details of the exclusive jobs +Attempt to do this in parallel +""" +def filter3_3_par(df_jobs, exclusive_job_ids, joined_total_power): + # Use multiprocessing.Pool to parallelize the list comprehension + pool = multiprocessing.Pool() + result = pool.map(functools.partial(check_condition_f3, exclusive_job_ids=exclusive_job_ids), joined_total_power.values) + + # Filter the values based on the condition + result = [x for x, res in zip(joined_total_power.values, result) if res] + + df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"]) + df_exclusive_jobs = df_jobs[df_jobs["job_id"].isin(exclusive_job_ids.astype(int))] + return df_total_power_exclusive_single, df_exclusive_jobs + + +""" +Filter 3: find the jobs that share a node +The idea is to add a new column to `df_total_power` +with a list of jobs that run on each node at each timestamp. +Then we filter jobs that appear sharing with another job +Version for multi-node jobs +""" +def filter3_1_multi(df_jobs_multi, df_total_power): + df_jobs_multi["node"] = [ast.literal_eval(x) for x in df_jobs_multi['nodes']] + + #use this for smaller inputs + #TEST_SAMPLE_SIZE = 1000 + + TEST_SAMPLE_SIZE = len(df_jobs_multi) + sample = df_jobs_multi[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True) + + #sample = df_jobs_single + + group = df_total_power.groupby(by="node") + + #for debugging + #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()] + + #i know that x[0] is the node list, x[1] is start_time, and x[2] is end_time + #each element of the list is a subseries of the total_power of a certain mode + #with "1" at the timestamps where the "job_id" ran + # i replace the "0"s (where the timestamp is not betweenx[1], x[2]) with nans just to remove these values with dropna. + result = [[[group.get_group(y)["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str) for y in x[0]], x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()] + + #print(type(group.get_group(512))) + #group.get_group(x[0]) + + #result[0] = Series with timestamps with True where the job run, and False otherwise + #result[1] = job id + #result.values.to_list() + + # now i replace the "1"s with the "job_id" + #each element of the list is a subseries of the total_power of a certain mode + #with the "job_id" value at the timestamps where the "job_id" ran + result2 = [pd.concat([y.replace("1.0", str(x[1])) for y in x[0]]) for x in result] + + #finally i concatenate each of the series by index + # while joining the values where indices overlap + # i hope the indices here are consistent with the indices in df_total_power + #concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True) + + concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids") + #print(concatenated_series) + + # use the below code for the full run + #joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="left") + + #use the below code for a rest run + joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner") + print(joined_total_power) + return joined_total_power + +""" +Save results to csv +""" +def save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile): + metric = metricfile.split("/")[-2] + jobfile_out = jobfile_single.rstrip(metric+"_filter12_singlenode.csv") + metricfile_out = metricfile.rstrip("a_0.parquet") + df_exclusive_jobs_single.to_csv(jobfile_out+metric+"_filter123_singlenode.csv", index=False) + df_exclusive_jobs_multi.to_csv(jobfile_out+metric+"_filter123_multinode.csv", index=False) + df_total_power_exclusive_single.to_csv(metricfile_out+"a_0_filter123_singlenode.csv", index=False) + df_total_power_exclusive_multi.to_csv(metricfile_out+"a_0_filter123_multinode.csv", index=False) + + +""" +Run workflow +""" +def run_workflow(metricfile, jobfile_single, jobfile_multi): + df_jobs_single, df_jobs_multi, df_total_power = read_data(jobfile_single, jobfile_multi, metricfile) + #Single-node jobs workflow + joined_total_power = filter3_1_single(df_jobs_single, df_total_power) + exclusive_job_ids = filter3_2(joined_total_power) + #df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power) + df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_3_par(df_jobs_single, exclusive_job_ids, joined_total_power) + print(df_total_power_exclusive_single) + print(df_exclusive_jobs_single) + ############################### + #Multi-node jobs workflow + joined_total_power = filter3_1_multi(df_jobs_multi, df_total_power) + exclusive_job_ids = filter3_2(joined_total_power) + #df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power) + df_total_power_exclusive_multi, df_exclusive_jobs_multi = filter3_3_par(df_jobs_multi, exclusive_job_ids, joined_total_power) + print(df_total_power_exclusive_multi) + print(df_exclusive_jobs_multi) + save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile) + ############################### + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles') + + p.add_argument("--metricfile", "-m", type=str, required=True, + help="Metric file") + p.add_argument("--jobfilesingle", "-js", type=str, required=True, + help="Job table file for single-node jobs (output from process_marconi_jobs_1.py)") + p.add_argument("--jobfilemulti", "-jm", type=str, required=True, + help="Job table file for multi-node jobs (output from process_marconi_jobs_1.py)") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.metricfile, args.jobfilesingle, args.jobfilemulti) \ No newline at end of file diff --git a/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py b/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py new file mode 100644 index 0000000000000000000000000000000000000000..7555644c217f9313ede1aa8da35dc3871fc6a651 --- /dev/null +++ b/scripts-py/expe_energumen/predict_jobs_power_allmethods_max.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python3 +import sys +import argparse +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor +from sklearn.svm import SVR, LinearSVR +from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.model_selection import train_test_split, GridSearchCV +from sklearn.feature_selection import RFE +from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error +from itertools import product + +""" +Initial variables +""" +## Time constants +NANOSECONDS_ONE_SECOND = 1e9 +SEC_ONE_MINUTE=60 +SEC_ONE_HOUR=60*SEC_ONE_MINUTE +SEC_ONE_DAY=24*SEC_ONE_HOUR +SEC_ONE_WEEK=7*SEC_ONE_DAY + +# Experiments parameters +## values for job submission time difference penalization +POWER_FACTORS={"sublinear": 0.5, + "linear": 1.0, + "quadratic": 2.0} +## Predict using only jobs with the same nb of resources +SAME_NB_RESOURCES={"yes": True, + "no": False} + + +## List of models used in the regression +MODELS=[ + {'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}}, + {'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}}, + {'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}}, + {'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}}, +] +MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor'] + +## train model only if the user's number of jobs is at least MIN_NB_JOBS +MIN_NB_JOBS=[10] +## Further reduce train set size (used for testing, default must be 0.0) +## For info: regression train/test split is 0.66 train and 0.33 test +#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0] +PERCENTAGE_TRAINING_DATA=[0.0] +## Columns for regression training experiments parameters +EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data'] +## Columns for historical prediction experiments parameters +EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources'] +## Job Features +# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing +FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"] +#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets'] +## Lag features used in regression +LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev'] +## The value we want to predict +TARGET_LABEL='total_power_max_watts' + +SVR_TO_LINEARSVR_THRESHOLD = 10000 + +# Define the parameter grid +param_grid = { + 'SVR':{ + 'svr__C': np.logspace(-3, 3, 7), + 'svr__gamma': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearSVR': { + 'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'RandomForestRegressor': { + 'randomforestregressor__n_estimators': [10, 50, 100, 200], + 'randomforestregressor__max_depth': [None, 10, 20, 30], + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearRegression': { + #'linearregression__': {}, + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'SGDRegressor': { + 'sgdregressor__alpha': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + } +} + +## Create SKlearn pipeline according to the model +def create_pipeline(model_conf): + model_obj = model_conf["value"](**model_conf["kwargs"]) + pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj) + return pipeline + +# Function to convert time strings to total seconds +def time_str_to_seconds(time_str): + #print(time_str) + components = time_str.split('-') + + if len(components) == 1: # No days component + time_parts = time_str.split(':') + if len(time_parts) == 1: # Format M:SS + time_str = "0:" + time_str + elif len(time_parts) == 2: # Format MM:SS + time_str = "00:" + time_str + return pd.to_timedelta(time_str).total_seconds() + elif len(components) == 2: # Format DD-HH:MM:SS + days, time_part = components + time_seconds = pd.to_timedelta(time_part).total_seconds() + return int(days) * 24 * 3600 + time_seconds + else: + return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months + +""" +Read and prepare data to use in the prediction +""" +def read_data(jobfile, user): + ## Holds the first job of each user (key=user_id) + dct_users_first_job_timestamp={} + + ## Holds a dataframe of jobs for each user (key=user_id) + jobs_users={} + + ## Read jobfile + df_jobs = pd.read_csv(jobfile, low_memory=False) + + ## Convert datetime columns to integer timestamps in seconds + df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds) + + lst_users = [user] + + ## Sort jobs by submission time + df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True) + ## Get (back) datetime objects + df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s') + df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour + + ## Initialize new columns + df_jobs['user_submission_week']=np.nan + df_jobs['lag_total_power_mean']=np.nan + df_jobs['lag_total_power_std']=np.nan + df_jobs['lag_total_power_prev']=np.nan + df_jobs['hist_pred_total_power_max']=np.nan + #df_jobs['reg_pred_total_power_mean']=np.nan + col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES] + _ = [df_jobs.assign(x=np.nan) for x in col_names] + + ## Populate jobs_users and dct_users_first_job_timestamp + ## Also computing the week number of the job submission + for user in lst_users: + jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index() + dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0] + first_job_timestamp = dct_users_first_job_timestamp[user] + jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK + jobs_users[user]=jobs_user + #print(user, len(jobs_users[user])) + #print(jobs_user) + #break + + return lst_users, jobs_users, dct_users_first_job_timestamp + +""" +Run prediction experiments +""" +def run_experiments(hist_design, + reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=None): + + ## Initialize the dict for the week + dct_users_jobs[user] = {} + + ## Get the jobs dataframe of the user + jobs_user = jobs_users[user] + + ## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0]) + jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list() + + + ## Get te timestamp of the user's first job + first_job_timestamp = dct_users_first_job_timestamp[user] + + ## Run the prediction routine for each week the user submitted jobs + for week in jobs_weeks: + ## Get the dataframe of user's jobs at week + user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy() + + ## for each job in week + for index, job in user_jobs_week.iterrows(): + if index == 0: + continue + else: + ################################################################## + ### Weighted Mean of up previous finished jobs in the jobs history + ### add the mean power of previous user's jobs + ### (weighted history approach) as features + ################################################################## + + ## Submission timestamp of the job + job_subm_time = job['submit_time'] + + ## Timestamp where the history will start + history_start = first_job_timestamp + + ## History window size (start counting since the first user job submission) + history_size = job_subm_time - history_start + + ## Get only jobs that finished before the row submission time + prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True) + + ## Further filter jobs if same_nb_resources (evals to False by default) + if hist_design['same_nb_resources'].values[0] == True: + prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']] + + if len(prev_jobs) < 1: + ## No job has finished before job_subm_time + continue + else: + ## Compute the standard deviation of the jobs in the history (lag feature) + user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs[TARGET_LABEL].std() + + ## Assign weights for the previous jobs in the history according to the power_factor + prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size) + prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0]) + + ## Compute the mean of the jobs in the history (lag feature) + user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs[TARGET_LABEL]*prev_jobs['weight']).sum()/prev_jobs['weight'].sum() + + ## Compute the mean of the jobs in the history (historical prediction result) + user_jobs_week.loc[index,'hist_pred_total_power_max'] = user_jobs_week.loc[index,'lag_total_power_mean'] + + ## Get the power metric of the most recent user job (lag feature, not used by default) + user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1][TARGET_LABEL] + + ## Add the user's week jobs in the dict + dct_users_jobs[user][week]=user_jobs_week + + prev_weeks=[] + + #################################################################################### + ### Regression multi-model power prediction + ### For each week >= 1 use the first 2/3 of the user's finished job history as train + ### and the remaining 1/3 as test + ### Run regression fitting for each of the models in MODELS + ### Select the best regression model regarding the test mean squared error (MSE) + ### Compare with the MSE of the historic prediction on the test + ### For week+1 use the model (regression or historic) with the lowest MSE + #################################################################################### + for i, week in enumerate(jobs_weeks): + + ## We skip the first week for regression + if i == 0: + prev_weeks.append(week) + continue + + ## Get the user's jobs at current week (only jobs with historic data prediction) + user_jobs_week=dct_users_jobs[user][week] + + ## Get all previous jobs for regression fitting + model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS) + + ## Add week in the list of previous weeks to get data + ### This will be used at the next week iteration + prev_weeks.append(week) + + ##DEBUG + #print(len(model_jobs_user)) + + regression=False + + ## Iterate over the experiments parameters (which vary the models) + for index, exp in reg_design.iterrows(): + ## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default) + if len(model_jobs_user) >= exp['min_reg_nb_jobs']: + regression=True + + ## Append result data to the experiments parameters dict + dct_result = exp.to_dict() + dct_result['user_id']=user + dct_result['user_week']=week + dct_result['hist_power_factor']=hist_design['power_factor'].values[0] + dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0] + + ## Select features and target label + X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS] + y=model_jobs_user[TARGET_LABEL] + + ## Train test split + ## Test split is important if we want to switch between algortihms at each week + #X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False) + + ## With this we only evaluate the training performance. + ## This is ok, since X, y are jobs from previous weeks. + ## We "test" the models at the current week, which is not present in X, y + X_train, y_train = X, y + + ## exp['percentage_training_data'] evals to 0 -> nothing changes + X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)] + y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)] + + ## Create an ML pipeline of standard scaling -> model and then train the model + + pipeline = create_pipeline(exp["model"]) + + #print("Week ", week, "Model: ", model, "Train Size: ", len(X_train)) + + reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train) + + ## Append result data to the experiments parameters dict + dct_result['model_name']=exp['model']['key'] + + best_predictor = reg.best_estimator_ + dct_result['model_object']=best_predictor + + ## Predict on the test dataset + test_predictions=best_predictor.predict(X_train) + + ## Append result data to the experiments parameters dict + dct_result['model']=exp['model']['key'] + + # Access the selected features using the RFE estimator from the best model + best_rfe = reg.best_estimator_.named_steps['rfe'] + selected_features = np.where(best_rfe.support_)[0] + + cols = FEATURE_LABELS+LAG_FEATURE_LABELS + feature_names = [cols[x] for x in selected_features] + dct_result['features']=feature_names + #print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names) + dct_result['reg_total_power_max_mape']=mean_absolute_percentage_error(y_train, test_predictions) + dct_result['reg_total_power_max_mae']=mean_absolute_error(y_train, test_predictions) + dct_result['reg_total_power_max_mse']=mean_squared_error(y_train, test_predictions) + + print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_max_mape'], "MAE: ", dct_result['reg_total_power_max_mae'], "MSE: ", dct_result['reg_total_power_max_mse']) + ### Results of the historic data power prediction + dct_result['hist_total_power_max_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_max_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean']) + + ## Append to the global list of training results + lst_results.append(dct_result) + + ## If regression training was performed using prev_weeks, select the best model for week + if regression == True: + + ## Get the week prediction results from the global list lst_results + df_week_results=pd.DataFrame(lst_results) + df_week_results=df_week_results.loc[df_week_results['user_week']==week] + + #print(df_week_results) + + ## Predict for week using all models + for _, result in df_week_results.iterrows(): + model_name = result['model_name'] + model_object = result['model_object'] + + col_label = model_name+"_"+TARGET_LABEL + user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS]) + + lst_predicted_jobs.append(user_jobs_week) + + return lst_predicted_jobs + +""" +Start prediction routine +""" +def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp): + # Construct the design of experiments for the prediction + ## Historical prediction design + ## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False) + design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])] + + ## Regression design + ## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0] + ## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored + design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA)) + + df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS) + df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS) + + ## Holds the list of regression results (TO VERIFY) + lst_results=[] + ## Holds the list of predicted jobs + ## Each position is a dataframe of predicted jobs of a user + lst_predicted_jobs=[] + ## Holds the weekly job submission for each user + ## Key is [user][week] + dct_users_jobs={} + + #for index, exp in df_ff_design.iterrows(): + for user in lst_users: + #print('user: ', user) + dct_users_jobs[user] = {} + lst_predicted_jobs = run_experiments(df_hist_design, + df_reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=user) + #print(str(index) + ' of ' + str(len(df_ff_design)), end='\r') + + df_predicted_jobs = [] + + if lst_predicted_jobs: + df_predicted_jobs=pd.concat(lst_predicted_jobs) + + if len(df_predicted_jobs) != 0: + df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_max_watts','hist_pred_total_power_max']) + + ## Print results stats, no longer needed + """ + for user in lst_users: + #print('user: ', user) + df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user] + print('number of predicted jobs: ', len(df_predicted_user)) + if len(df_predicted_user) == 0: + continue + df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history'] + if(len(df_predicted_user_hist) > 0): + print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean'])) + #df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean') + df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression'] + if(len(df_predicted_user_reg) > 0): + print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean'])) + print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + """ + + return df_predicted_jobs + +""" +Save predicted jobs data to csv +""" +def save_results(df_predicted_jobs, outputfile): + if len(df_predicted_jobs) == 0: + return + else: + df_predicted_jobs.to_csv(outputfile, index=False) + +""" +Run workflow +""" +def run_workflow(jobfile, outputfile, user): + lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user) + df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp) + save_results(df_predicted_jobs, outputfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outputfile", "-o", type=str, required=True, + help="Output file name (optionally with global path)") + p.add_argument("--user", "-u", type=int, required=True, + help="User ID") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.jobfile, args.outputfile, args.user) \ No newline at end of file diff --git a/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py b/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py new file mode 100644 index 0000000000000000000000000000000000000000..35c00532646b9d6b9daa4c9b81328c67528dfa1c --- /dev/null +++ b/scripts-py/expe_energumen/predict_jobs_power_allmethods_mean.py @@ -0,0 +1,494 @@ +#!/usr/bin/env python3 +import sys +import argparse +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor +from sklearn.svm import SVR, LinearSVR +from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler +from sklearn.model_selection import train_test_split, GridSearchCV +from sklearn.feature_selection import RFE +from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error +from itertools import product + +""" +Initial variables +""" +## Time constants +NANOSECONDS_ONE_SECOND = 1e9 +SEC_ONE_MINUTE=60 +SEC_ONE_HOUR=60*SEC_ONE_MINUTE +SEC_ONE_DAY=24*SEC_ONE_HOUR +SEC_ONE_WEEK=7*SEC_ONE_DAY + +# Experiments parameters +## values for job submission time difference penalization +POWER_FACTORS={"sublinear": 0.5, + "linear": 1.0, + "quadratic": 2.0} +## Predict using only jobs with the same nb of resources +SAME_NB_RESOURCES={"yes": True, + "no": False} + + +## List of models used in the regression +MODELS=[ + {'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}}, + {'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}}, + {'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}}, + {'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}}, +] +MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor'] + +## train model only if the user's number of jobs is at least MIN_NB_JOBS +MIN_NB_JOBS=[10] +## Further reduce train set size (used for testing, default must be 0.0) +## For info: regression train/test split is 0.66 train and 0.33 test +#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0] +PERCENTAGE_TRAINING_DATA=[0.0] +## Columns for regression training experiments parameters +EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data'] +## Columns for historical prediction experiments parameters +EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources'] +## Job Features +# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing +FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"] +#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets'] +## Lag features used in regression +LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev'] +## The value we want to predict +TARGET_LABEL='total_power_mean_watts' + +SVR_TO_LINEARSVR_THRESHOLD = 10000 + +# Define the parameter grid +param_grid = { + 'SVR':{ + 'svr__C': np.logspace(-3, 3, 7), + 'svr__gamma': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearSVR': { + 'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'RandomForestRegressor': { + 'randomforestregressor__n_estimators': [10, 50, 100, 200], + 'randomforestregressor__max_depth': [None, 10, 20, 30], + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'LinearRegression': { + #'linearregression__': {}, + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + }, + 'SGDRegressor': { + 'sgdregressor__alpha': np.logspace(-3, 3, 7), + 'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS))) + } +} + +## Create SKlearn pipeline according to the model +def create_pipeline(model_conf): + model_obj = model_conf["value"](**model_conf["kwargs"]) + pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj) + return pipeline + +# Function to convert time strings to total seconds +def time_str_to_seconds(time_str): + #print(time_str) + components = time_str.split('-') + + if len(components) == 1: # No days component + time_parts = time_str.split(':') + if len(time_parts) == 1: # Format M:SS + time_str = "0:" + time_str + elif len(time_parts) == 2: # Format MM:SS + time_str = "00:" + time_str + return pd.to_timedelta(time_str).total_seconds() + elif len(components) == 2: # Format DD-HH:MM:SS + days, time_part = components + time_seconds = pd.to_timedelta(time_part).total_seconds() + return int(days) * 24 * 3600 + time_seconds + else: + return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months + +""" +Read and prepare data to use in the prediction +""" +def read_data(jobfile, user): + ## Holds the first job of each user (key=user_id) + dct_users_first_job_timestamp={} + + ## Holds a dataframe of jobs for each user (key=user_id) + jobs_users={} + + ## Read jobfile + df_jobs = pd.read_csv(jobfile, low_memory=False) + + ## Convert datetime columns to integer timestamps in seconds + df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND + df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds) + + lst_users = [user] + + ## Sort jobs by submission time + df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True) + ## Get (back) datetime objects + df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s') + df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour + + ## Initialize new columns + df_jobs['user_submission_week']=np.nan + df_jobs['lag_total_power_mean']=np.nan + df_jobs['lag_total_power_std']=np.nan + df_jobs['lag_total_power_prev']=np.nan + df_jobs['hist_pred_total_power_mean']=np.nan + #df_jobs['reg_pred_total_power_mean']=np.nan + col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES] + _ = [df_jobs.assign(x=np.nan) for x in col_names] + + ## Populate jobs_users and dct_users_first_job_timestamp + ## Also computing the week number of the job submission + for user in lst_users: + jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index() + dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0] + first_job_timestamp = dct_users_first_job_timestamp[user] + jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK + jobs_users[user]=jobs_user + #print(user, len(jobs_users[user])) + #print(jobs_user) + #break + + return lst_users, jobs_users, dct_users_first_job_timestamp + +""" +Run prediction experiments +""" +def run_experiments(hist_design, + reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=None): + + ## Initialize the dict for the week + dct_users_jobs[user] = {} + + ## Get the jobs dataframe of the user + jobs_user = jobs_users[user] + + ## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0]) + jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list() + + + ## Get te timestamp of the user's first job + first_job_timestamp = dct_users_first_job_timestamp[user] + + ## Run the prediction routine for each week the user submitted jobs + for week in jobs_weeks: + ## Get the dataframe of user's jobs at week + user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy() + + ## for each job in week + for index, job in user_jobs_week.iterrows(): + if index == 0: + continue + else: + ################################################################## + ### Weighted Mean of up previous finished jobs in the jobs history + ### add the mean power of previous user's jobs + ### (weighted history approach) as features + ################################################################## + + ## Submission timestamp of the job + job_subm_time = job['submit_time'] + + ## Timestamp where the history will start + history_start = first_job_timestamp + + ## History window size (start counting since the first user job submission) + history_size = job_subm_time - history_start + + ## Get only jobs that finished before the row submission time + prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True) + + ## Further filter jobs if same_nb_resources (evals to False by default) + if hist_design['same_nb_resources'].values[0] == True: + prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']] + + if len(prev_jobs) < 1: + ## No job has finished before job_subm_time + continue + else: + ## Compute the standard deviation of the jobs in the history (lag feature) + user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs['total_power_mean_watts'].std() + + ## Assign weights for the previous jobs in the history according to the power_factor + prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size) + prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0]) + + ## Compute the mean of the jobs in the history (lag feature) + user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs['total_power_mean_watts']*prev_jobs['weight']).sum()/prev_jobs['weight'].sum() + + ## Compute the mean of the jobs in the history (historical prediction result) + user_jobs_week.loc[index,'hist_pred_total_power_mean'] = user_jobs_week.loc[index,'lag_total_power_mean'] + + ## Get the power metric of the most recent user job (lag feature, not used by default) + user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1]['total_power_mean_watts'] + + ## Add the user's week jobs in the dict + dct_users_jobs[user][week]=user_jobs_week + + prev_weeks=[] + + #################################################################################### + ### Regression multi-model power prediction + ### For each week >= 1 use the first 2/3 of the user's finished job history as train + ### and the remaining 1/3 as test + ### Run regression fitting for each of the models in MODELS + ### Select the best regression model regarding the test mean squared error (MSE) + ### Compare with the MSE of the historic prediction on the test + ### For week+1 use the model (regression or historic) with the lowest MSE + #################################################################################### + for i, week in enumerate(jobs_weeks): + + ## We skip the first week for regression + if i == 0: + prev_weeks.append(week) + continue + + ## Get the user's jobs at current week (only jobs with historic data prediction) + user_jobs_week=dct_users_jobs[user][week] + + ## Get all previous jobs for regression fitting + model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS) + + ## Add week in the list of previous weeks to get data + ### This will be used at the next week iteration + prev_weeks.append(week) + + ##DEBUG + #print(len(model_jobs_user)) + + regression=False + + ## Iterate over the experiments parameters (which vary the models) + for index, exp in reg_design.iterrows(): + ## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default) + if len(model_jobs_user) >= exp['min_reg_nb_jobs']: + regression=True + + ## Append result data to the experiments parameters dict + dct_result = exp.to_dict() + dct_result['user_id']=user + dct_result['user_week']=week + dct_result['hist_power_factor']=hist_design['power_factor'].values[0] + dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0] + + ## Select features and target label + X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS] + y=model_jobs_user[TARGET_LABEL] + + ## Train test split + ## Test split is important if we want to switch between algortihms at each week + #X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False) + + ## With this we only evaluate the training performance. + ## This is ok, since X, y are jobs from previous weeks. + ## We "test" the models at the current week, which is not present in X, y + X_train, y_train = X, y + + ## exp['percentage_training_data'] evals to 0 -> nothing changes + X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)] + y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)] + + ## Create an ML pipeline of standard scaling -> model and then train the model + + pipeline = create_pipeline(exp["model"]) + + #print("Week ", week, "Model: ", model, "Train Size: ", len(X_train)) + + reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train) + + ## Append result data to the experiments parameters dict + dct_result['model_name']=exp['model']['key'] + #dct_result['model_name']=model + + best_predictor = reg.best_estimator_ + dct_result['model_object']=best_predictor + + ## Predict on the train dataset + test_predictions=best_predictor.predict(X_train) + + ## Append result data to the experiments parameters dict + dct_result['model']=exp['model']['key'] + + # Access the selected features using the RFE estimator from the best model + best_rfe = reg.best_estimator_.named_steps['rfe'] + selected_features = np.where(best_rfe.support_)[0] + + cols = FEATURE_LABELS+LAG_FEATURE_LABELS + feature_names = [cols[x] for x in selected_features] + dct_result['features']=feature_names + #print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names) + dct_result['reg_total_power_mean_mape']=mean_absolute_percentage_error(y_train, test_predictions) + dct_result['reg_total_power_mean_mae']=mean_absolute_error(y_train, test_predictions) + dct_result['reg_total_power_mean_mse']=mean_squared_error(y_train, test_predictions) + print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_mean_mape'], "MAE: ", dct_result['reg_total_power_mean_mae'], "MSE: ", dct_result['reg_total_power_mean_mse']) + + ### Results of the historic data power prediction + dct_result['hist_total_power_mean_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_mean_mae']=mean_absolute_error(y_train, X_train['lag_total_power_mean']) + dct_result['hist_total_power_mean_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean']) + #print("User: ", user, "Week ", week, "Model: ", "History", "MAPE: ", dct_result['hist_total_power_mean_mape'], "MAE: ", dct_result['hist_total_power_mean_mae'], "MSE: ", dct_result['hist_total_power_mean_mse']) + + ## Append to the global list of training results + lst_results.append(dct_result) + + ## If regression training was performed using prev_weeks, select the best model for week + if regression == True: + + ## Get the week prediction results from the global list lst_results + df_week_results=pd.DataFrame(lst_results) + df_week_results=df_week_results.loc[df_week_results['user_week']==week] + + #print(df_week_results) + + ## Predict for week using all models + for _, result in df_week_results.iterrows(): + model_name = result['model_name'] + model_object = result['model_object'] + + col_label = model_name+"_"+TARGET_LABEL + user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS]) + + lst_predicted_jobs.append(user_jobs_week) + + return lst_predicted_jobs + +""" +Start prediction routine +""" +def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp): + # Construct the design of experiments for the prediction + ## Historical prediction design + ## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False) + design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])] + + ## Regression design + ## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0] + ## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored + design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA)) + + df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS) + df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS) + #print(len(df_ff_design)) + + ## Holds the list of regression results (TO VERIFY) + lst_results=[] + ## Holds the list of predicted jobs + ## Each position is a dataframe of predicted jobs of a user + lst_predicted_jobs=[] + ## Holds the weekly job submission for each user + ## Key is [user][week] + dct_users_jobs={} + + #for index, exp in df_ff_design.iterrows(): + for user in lst_users: + #print('user: ', user) + dct_users_jobs[user] = {} + lst_predicted_jobs = run_experiments(df_hist_design, + df_reg_design, + jobs_users, + dct_users_jobs, + dct_users_first_job_timestamp, + lst_results, + lst_predicted_jobs, + user=user) + #print(str(index) + ' of ' + str(len(df_ff_design)), end='\r') + + df_predicted_jobs = [] + + if lst_predicted_jobs: + df_predicted_jobs=pd.concat(lst_predicted_jobs) + + if len(df_predicted_jobs) != 0: + df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_mean_watts','hist_pred_total_power_mean']) + + ## Print results stats, no longer needed + """ + for user in lst_users: + #print('user: ', user) + df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user] + print('number of predicted jobs: ', len(df_predicted_user)) + if len(df_predicted_user) == 0: + continue + df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history'] + if(len(df_predicted_user_hist) > 0): + print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean'])) + print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean'])) + #df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean') + df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression'] + if(len(df_predicted_user_reg) > 0): + print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean'])) + print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean'])) + """ + + return df_predicted_jobs + +""" +Save predicted jobs data to csv +""" +def save_results(df_predicted_jobs, outputfile): + if len(df_predicted_jobs) == 0: + return + else: + df_predicted_jobs.to_csv(outputfile, index=False) + +""" +Run workflow +""" +def run_workflow(jobfile, outputfile, user): + lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user) + df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp) + save_results(df_predicted_jobs, outputfile) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outputfile", "-o", type=str, required=True, + help="Output file name (optionally with global path)") + p.add_argument("--user", "-u", type=int, required=True, + help="User ID") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python process_marconi_jobs.py --help') + sys.exit(1) + + run_workflow(args.jobfile, args.outputfile, args.user) \ No newline at end of file diff --git a/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py new file mode 100644 index 0000000000000000000000000000000000000000..6864e3e2bddc4a9ccf471e5549f95799890371d1 --- /dev/null +++ b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py @@ -0,0 +1,60 @@ +import pandas as pd +import subprocess +import sys +import argparse +import os +import functools +from multiprocessing import Pool + + +""" +User power prediction subroutine +""" +def run_prediction_user(user_id, jobfile=None, outdir=None): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + command = ['python', 'predict_jobs_power.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Run Workflow +""" +def run_workflow(jobfile, outdir): + # Load the dataframe with "user_id" column + df = pd.read_csv(jobfile) + + # Iterate over each user_id and run predict_jobs_power_svr.py + for user_id in df['user_id'].drop_duplicates(): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + # Skip user if prediction was already performed + if os.path.isfile(outfile) == True: + continue + command = ['python', 'predict_jobs_power_allmethods_max.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outdir", "-o", type=str, required=True, + help="Directory of the output files") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python run_prediction_per_user --help') + sys.exit(1) + + run_workflow(args.jobfile, args.outdir) \ No newline at end of file diff --git a/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py new file mode 100644 index 0000000000000000000000000000000000000000..7571f6c46f583f5bd67bac2a9d02d47caae8998c --- /dev/null +++ b/scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py @@ -0,0 +1,60 @@ +import pandas as pd +import subprocess +import sys +import argparse +import os +import functools +from multiprocessing import Pool + + +""" +User power prediction subroutine +""" +def run_prediction_user(user_id, jobfile=None, outdir=None): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + command = ['python', 'predict_jobs_power.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Run Workflow +""" +def run_workflow(jobfile, outdir): + # Load the dataframe with "user_id" column + df = pd.read_csv(jobfile) + + # Iterate over each user_id and run predict_jobs_power_svr.py + for user_id in df['user_id'].drop_duplicates(): + outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz' + # Skip user if prediction was already performed + if os.path.isfile(outfile) == True: + continue + command = ['python', 'predict_jobs_power_allmethods_mean.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)] + subprocess.run(command) + +""" +Read Command line interface +""" +def read_cli(): + # Make parser object + p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user') + + p.add_argument("--jobfile", "-j", type=str, required=True, + help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)") + p.add_argument("--outdir", "-o", type=str, required=True, + help="Directory of the output files") + + return(p.parse_args()) + +if __name__ == '__main__': + + if sys.version_info<(3,5,0): + sys.stderr.write("You need python 3.5 or later to run this script\n") + sys.exit(1) + + try: + args = read_cli() + except: + print('Try $python run_prediction_per_user --help') + sys.exit(1) + + run_workflow(args.jobfile, args.outdir) \ No newline at end of file