Skip to content
Snippets Groups Projects
Commit 17fd6eeb authored by danilo.carastandossantos's avatar danilo.carastandossantos
Browse files

init prediction artifacts

parent 79b886ec
No related branches found
No related tags found
No related merge requests found
...@@ -294,19 +294,6 @@ The step-by-step instructions of this document can be used in several ways *depe ...@@ -294,19 +294,6 @@ The step-by-step instructions of this document can be used in several ways *depe
You can follow all steps below in this case, You can follow all steps below in this case,
but *please do note that this is disk/bandwidth/computation-intensive.* but *please do note that this is disk/bandwidth/computation-intensive.*
== Job power prediction <sec-job-power-pred>
How to reproduce the power predictions of jobs has not been written yet.
The expected output data of this section has however been stored on #link(zenodo-url)[Zenodo].
//#tododanilo[how to reproduce this experiment?]
#fullbox(footer: [Disk: 82 Mo.])[
#filehashes((
"fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz",
"954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz",
))
]
== Analysis and modeling of the power behavior of Marconi100 nodes == Analysis and modeling of the power behavior of Marconi100 nodes
=== Get power and job Marconi100 traces on your disk <sec-m100-power-job-traces> === Get power and job Marconi100 traces on your disk <sec-m100-power-job-traces>
This section downloads parts of the Marconi100 trace as archives from #link("https://gitlab.com/ecs-lab/exadata")[the ExaData Zenodo files], checks that the archives have the right content (via MD5 checksums), extracts the data needed by later stages of the pipeline (node power usage traces, jobs information traces), then finally removes unneeded extracted files and the downloaded archives. This section downloads parts of the Marconi100 trace as archives from #link("https://gitlab.com/ecs-lab/exadata")[the ExaData Zenodo files], checks that the archives have the right content (via MD5 checksums), extracts the data needed by later stages of the pipeline (node power usage traces, jobs information traces), then finally removes unneeded extracted files and the downloaded archives.
...@@ -381,6 +368,258 @@ Required input files. ...@@ -381,6 +368,258 @@ Required input files.
Their content should be completely reproducible though. Their content should be completely reproducible though.
] ]
== Job power prediction <sec-job-power-pred>
The experimental workflow consists of three parts, (i) preprocessing of the original data, and
(ii) prediction of the mean and maximum power consumption.
=== Pre-processing
==== Step 1
#tododanilo[in the script source-code: change output filename of step 1
from a_0_filter12_singlenode.csv and from a_0_filter12_multinode.csv
to 22-0X_filter12_singlenode.csv and 22-0X_filter12_multinode.csv]
#fullbox(footer:[Memory: 128 Go. Time (sequential): 18:00:00])[
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-01_jobs.parquet \
-p m100-data/22-01_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-02_jobs.parquet \
-p m100-data/22-02_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-03_jobs.parquet \
-p m100-data/22-03_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-04_jobs.parquet \
-p m100-data/22-04_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-05_jobs.parquet \
-p m100-data/22-05_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-06_jobs.parquet \
-p m100-data/22-06_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-07_jobs.parquet \
-p m100-data/22-07_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-08_jobs.parquet \
-p m100-data/22-08_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_1.py \
-j ../m100-data/22-09_jobs.parquet \
-p m100-data/22-09_power_total.parquet
```
]
=== Step 2
#tododanilo[in the script source-code: change output filename of step 2
from a_0_filter123_singlenode.csv and from a_0_filter123_multinode.csv
to 22-0X_filter123_singlenode.csv and 22-0X_filter123_multinode.csv]
#fullbox(footer:[Memory: 128 Go. Time (sequential): 66:00:00])[
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-01_filter12_singlenode.csv \
-jm ./m100-data/22-01_filter12_multinode.csv
-p m100-data/22-01_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-02_filter12_singlenode.csv \
-jm ./m100-data/22-02_filter12_multinode.csv
-p m100-data/22-02_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-03_filter12_singlenode.csv \
-jm ./m100-data/22-03_filter12_multinode.csv
-p m100-data/22-03_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-04_filter12_singlenode.csv \
-jm ./m100-data/22-04_filter12_multinode.csv
-p m100-data/22-04_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-05_filter12_singlenode.csv \
-jm ./m100-data/22-05_filter12_multinode.csv
-p m100-data/22-05_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-06_filter12_singlenode.csv \
-jm ./m100-data/22-06_filter12_multinode.csv
-p m100-data/22-06_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-07_filter12_singlenode.csv \
-jm ./m100-data/22-07_filter12_multinode.csv
-p m100-data/22-07_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-08_filter12_singlenode.csv \
-jm ./m100-data/22-08_filter12_multinode.csv
-p m100-data/22-08_power_total.parquet
```
```python
./scripts-py/expe_energumen/m100_pred_preprocess_2.py \
-js ./m100-data/22-09_filter12_singlenode.csv \
-jm ./m100-data/22-09_filter12_multinode.csv
-p m100-data/22-09_power_total.parquet
```
]
=== Aggregate step 2 output into a single file
#fullbox(footer: [Disk: 32 Go.])[
find . -name '*filter123*' | tar -zcvf exadata_job_energy_profiles.tar.gz --files-from -
]
=== Compute power metrics and add job information
#tododanilo[Script source-code: change -d (dir path) and pass the path to the necessary files]
#fullbox(footer: [Disk: 32 Go.])[
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-01
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-02
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-03
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-04
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-05
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-06
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-07
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-08
```
``` python
./scripts-py/expe_energumen/m100_pred_jobs_extract_power_metrics.py \
-d ./data/year_month=22-09
```
]
=== Merge files into a single CSV file
This will output the `filter123_all_jobs_aggmetrics.csv.gz` needed for the prediction script
#tododanilo[check if /m100-data/ path is correct and also the path of the output]
#fullbox(footer: [Disk: 82 Mo.])[
``` python
./scripts-py/expe_energumen/m100_pred_merge_jobfiles.py -d ./m100-data/
```
]
== Predicting Job mean and maximum power consumption
#fullbox(footer:[Memory: 128 Go. Time (sequential): 72:00:00])[
```
mkdir ./m100-data/total_power_mean_predictions_users_allmethods_mean
mkdir ./m100-data/total_power_mean_predictions_users_allmethods_max
python ./scripts-py/expe_energumen/run_prediction_per_user_allmethods_mean.py \
-j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \
-o ./m100-data/total_power_mean_predictions_users_allmethods_mean
python ./scripts-py/expe_energumen/run_prediction_per_user_allmethods_max.py \
-j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \
-o ./m100-data/total_power_mean_predictions_users_allmethods_max
```
]
=== Compressing prediction output into single files
#fullbox(footer:[Disk: 82 Mo.])[
```
tar -cvzf ./m100-data/power_pred_users_allmethods_max.tar.gz \
./m100-data/total_power_mean_predictions_users_allmethods_mean
tar -cvzf ./m100-data/power_pred_users_allmethods_mean.tar.gz \
./m100-data/total_power_mean_predictions_users_allmethods_max
```
]
The expected output data of has been stored on #link(zenodo-url)[Zenodo].
//#tododanilo[how to reproduce this experiment?]
#fullbox(footer: [Disk: 82 Mo.])[
#filehashes((
"fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz",
"954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz",
))
]
== Analyzing prediction results
=== Required data
Output from the previous section
- `m100-data/power_pred_users_allmethods_mean.tar.gz`, the jobs mean power predictions.
- `m100-data/power_pred_users_allmethods_max.tar.gz`, the jobs maximum power predictions.
#tododanilo[Add notebook that make plots]
== Job scheduling with power prediction <sec-sched> == Job scheduling with power prediction <sec-sched>
This section shows how to reproduce Sections 6.4 and 6.5 of article @lightpredenergy. This section shows how to reproduce Sections 6.4 and 6.5 of article @lightpredenergy.
......
import argparse
import sys
import pandas as pd
import numpy as np
from scipy.stats import iqr
"""
Read input data
"""
def read_data(rootdir):
df_jobs_single = pd.read_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_singlenode.csv")
df_jobs_multi = pd.read_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_multinode.csv")
df_power_single = pd.read_csv(rootdir+"/plugin=ipmi_pub/metric=total_power/a_0_filter123_singlenode.csv")
df_power_multi = pd.read_csv(rootdir+"/plugin=ipmi_pub/metric=total_power/a_0_filter123_multinode.csv")
df_jobs = pd.concat([df_jobs_single, df_jobs_multi]).reset_index(drop=True)
df_power = pd.concat([df_power_single, df_power_multi]).reset_index(drop=True)
df_power['node'] = pd.to_numeric(df_power['node'])
df_power['value'] = pd.to_numeric(df_power['value'])
return df_jobs, df_power
"""
Calculate jobs' power aggregation metrics
"""
def calculate_agg_metrics(df_jobs, df_power):
powertrace_jobs = [df_power[df_power["job_id"]==x]['value'].values for x in df_jobs['job_id'].values]
df_jobs["total_power_max_watts"] = [x.max() for x in powertrace_jobs]
df_jobs["total_power_mean_watts"] = [x.mean() for x in powertrace_jobs]
df_jobs["total_power_median_watts"] = [np.median(x) for x in powertrace_jobs]
df_jobs["total_power_std_watts"] = [x.std() for x in powertrace_jobs]
df_jobs["total_power_iqr_watts"] = [iqr(x, rng=(25, 75)) for x in powertrace_jobs]
df_jobs["total_power_1090range_watts"] = [iqr(x, rng=(10, 90)) for x in powertrace_jobs]
return df_jobs
"""
Save results
"""
def save_results(df_jobs_aggmetrics, rootdir):
df_jobs_aggmetrics.to_csv(rootdir+"/plugin=job_table/metric=job_info_marconi100/a_0_filter123_aggmetrics.csv")
"""
Run workflow
"""
def run_workflow(rootdir):
df_jobs, df_power = read_data(rootdir)
df_jobs_aggmetrics = calculate_agg_metrics(df_jobs, df_power)
print(df_jobs_aggmetrics[["run_time",
"num_nodes",
"total_power_max_watts",
"total_power_mean_watts",
"total_power_median_watts"]])
save_results(df_jobs_aggmetrics, rootdir)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job power metrics. This script uses the output from process_marconi_jobs_2.py')
p.add_argument("--rootdir", "-d", type=str, required=True,
help="Root directory of the trace")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python process_marconi_jobs.py --help')
sys.exit(1)
run_workflow(args.rootdir)
\ No newline at end of file
import glob
import pandas as pd
import sys
import argparse
"""
Read job files spread in the months folders
"""
def read_jobifles(rootdir):
#DATASET_PATH = "/home/dancarastan/Documentos/exadata_job_energy_profiles/"
jobfiles_list = glob.glob(rootdir+"*"+"/plugin=job_table"+"/metric=job_info_marconi100"+"/a_0_filter123_aggmetrics.csv")
#print(len(jobfiles_list))
df_jobs = pd.concat([pd.read_csv(jobfile) for jobfile in jobfiles_list]).reset_index(drop=True)
return df_jobs
"""
Save results to compressed csv
"""
def save_results(df_jobs, rootdir):
df_jobs.to_csv(rootdir+"/filter123_all_jobs_aggmetrics.csv.gz", index=False)
"""
Run workflow
"""
def run_workflow(rootdir):
df_jobs = read_jobifles(rootdir)
save_results(df_jobs, rootdir)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Concatenate job table files into a single csv file')
p.add_argument("--rootdir", "-d", type=str, required=True,
help="Root directory of the trace")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python process_marconi_jobs.py --help')
sys.exit(1)
run_workflow(args.rootdir)
\ No newline at end of file
#!/usr/bin/env python3
import sys
import argparse
import ast
import pandas as pd
import numpy as np
"""
Read Data
"""
def read_data(jobfile, metricfile):
df_jobs = pd.read_parquet(jobfile)
df_power = pd.read_parquet(metricfile)
## I call here df_power, but in reality it can be any metric
return df_jobs, df_power
"""
Convert some string values to numeric
Convert timestamps to seconds for job runtime
"""
def preprocess_data(df_jobs, df_power):
df_power['node'] = pd.to_numeric(df_power['node'])
df_power['value'] = pd.to_numeric(df_power['value'])
df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's')
print("Total number of jobs: ", len(df_jobs))
return df_jobs, df_power
"""
Filter 1: remove jobs that take less than a minute
"""
def filter1(df_jobs):
SECONDS_ONE_MINUTE = 60.0
df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE]
print("Number of jobs after filter 1: ", str(len(df_jobs_f1)))
return df_jobs_f1
"""
Filter 2: remove jobs with no energy profile
Method for single-node jobs
"""
def filter2_single(df_jobs, df_power):
df_jobs_f1_single = df_jobs[df_jobs["num_nodes"]==1]
#removing malformed "nodes"
df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True)
df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']]
#use this for smaller inputs
#sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True)
sample = df_jobs_f1_single
group = df_power.groupby(by="node")
available_nodes = group.groups.keys()
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that 0 is node, 1 is start_time, and 2 is end_time
result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) if x[0] in available_nodes else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]
sample["has_profile"] = result
df_jobs_f12_single = sample[sample["has_profile"]==True]
print("Number of jobs after filter 2 - single: ", str(len(df_jobs_f12_single)))
return df_jobs_f12_single
"""
Filter 2: remove jobs with no energy profile
Method for multi-node jobs
"""
def filter2_multi(df_jobs, df_power):
df_jobs_f1_multi = df_jobs[df_jobs["num_nodes"] > 1].copy().reset_index(drop=True)
#removing malformed "nodes"
df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']]
#use this for smaller inputs
#sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True)
sample = df_jobs_f1_multi
group = df_power.groupby(by="node")
available_nodes = set(group.groups.keys())
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that 0 is node, 1 is start_time, and 2 is end_time
result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) if set(x[0]).issubset(available_nodes) else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]
sample["has_profile"] = result
df_jobs_f12_multi = sample[sample["has_profile"]==True]
print("Number of jobs after filter 2 - multi: ", str(len(df_jobs_f12_multi)))
return df_jobs_f12_multi
"""
Save intermediate results to csv
"""
def save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile):
jobfile_out = jobfile.rstrip("a_0.parquet")
metric = metricfile.split("/")[-2]
df_jobs_single.to_csv(jobfile_out+metric+"_filter12_singlenode.csv", index=False)
df_jobs_multi.to_csv(jobfile_out+metric+"_filter12_multinode.csv", index=False)
"""
Run workflow
"""
def run_workflow(metricfile, jobfile):
df_jobs, df_power = read_data(jobfile=jobfile, metricfile=metricfile)
df_jobs, df_power = preprocess_data(df_jobs=df_jobs, df_power=df_power)
df_jobs = filter1(df_jobs)
df_jobs_single = filter2_single(df_jobs, df_power)
df_jobs_multi = filter2_multi(df_jobs, df_power)
save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles')
p.add_argument("--metricfile", "-m", type=str, required=True,
help="Metric file")
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python process_marconi_jobs.py --help')
sys.exit(1)
run_workflow(args.metricfile, args.jobfile)
\ No newline at end of file
import argparse
import sys
import ast
import multiprocessing
import functools
import numpy as np
import pandas as pd
"""
Read data
Output from process_marconi_jobs_1.py
"""
def read_data(jobfile_single, jobfile_multi, metricfile):
df_jobs_single = pd.read_csv(jobfile_single)
df_jobs_multi = pd.read_csv(jobfile_multi)
## Here i refer to total_power but in reality it can be any metric
df_total_power = pd.read_parquet(metricfile)
df_total_power['node'] = pd.to_numeric(df_total_power['node'])
df_total_power['value'] = pd.to_numeric(df_total_power['value'])
return df_jobs_single, df_jobs_multi, df_total_power
"""
Filter 3: find the jobs that share a node
The idea is to add a new column to `df_total_power`
with a list of jobs that run on each node at each timestamp.
Then we filter jobs that appear sharing with another job
"""
def filter3_1_single(df_jobs_single, df_total_power):
#removing malformed "nodes"
df_jobs_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_single['nodes']]
#use this for smaller inputs
#TEST_SAMPLE_SIZE = 1000
TEST_SAMPLE_SIZE = len(df_jobs_single)
sample = df_jobs_single[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True)
#sample = df_jobs_single
group = df_total_power.groupby(by="node")
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that x[0] is node, x[1] is start_time, and x[2] is end_time
#each element of the list is a subseries of the total_power of a certain mode
#with "1" at the timestamps where the "job_id" ran
result = [[group.get_group(x[0])["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str), x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()]
# now i replace the "1"s with the "job_id"
#each element of the list is a subseries of the total_power of a certain mode
#with the "job_id" value at the timestamps where the "job_id" ran
result2 = [x[0].replace("1.0", str(x[1])) for x in result]
#print(result2)
#finally i concatenate each of the series by index
# while joining the values where indices overlap
# i hope the indices here are consistent with the indices in df_total_power
#concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True)
concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids")
joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner")
print(joined_total_power)
return joined_total_power
"""
Filter 3: find the jobs that not share a node
Based on filter3_single_1, return the list of job ids that
not shared a node with another job
"""
def filter3_2(joined_total_power):
all_job_ids=[]
_ = pd.Series([[all_job_ids.append(y) for y in x.split(",")] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()])
all_job_ids = pd.Series(all_job_ids).drop_duplicates()
nodeshare_job_ids = []
_ = pd.Series([[nodeshare_job_ids.append(y) for y in x.split(",") if len(x.split(",")) > 1] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()])
nodeshare_job_ids = pd.Series(nodeshare_job_ids).drop_duplicates()
exclusive_job_ids = all_job_ids[~all_job_ids.isin(nodeshare_job_ids)]
print("Nodeshare Job ratio: ", len(nodeshare_job_ids)/len(all_job_ids))
print(exclusive_job_ids)
return exclusive_job_ids
"""
Filter 3: find the jobs that not share a node
Getting the profile of the not nodeshared jobs
And details of the exclusive jobs
"""
def filter3_3(df_jobs_single, exclusive_job_ids, joined_total_power):
result = [x for x in joined_total_power.values if np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) == True]
df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"])
df_exclusive_jobs = df_jobs_single[df_jobs_single["job_id"].isin(exclusive_job_ids.astype(int))]
return df_total_power_exclusive_single, df_exclusive_jobs
"""
Filter 3: Define the function to check the condition in parallel
"""
def check_condition_f3(x, exclusive_job_ids=None):
return np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values)
"""
Filter 3: find the jobs that not share a node
Getting the profile of the not nodeshared jobs
And details of the exclusive jobs
Attempt to do this in parallel
"""
def filter3_3_par(df_jobs, exclusive_job_ids, joined_total_power):
# Use multiprocessing.Pool to parallelize the list comprehension
pool = multiprocessing.Pool()
result = pool.map(functools.partial(check_condition_f3, exclusive_job_ids=exclusive_job_ids), joined_total_power.values)
# Filter the values based on the condition
result = [x for x, res in zip(joined_total_power.values, result) if res]
df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"])
df_exclusive_jobs = df_jobs[df_jobs["job_id"].isin(exclusive_job_ids.astype(int))]
return df_total_power_exclusive_single, df_exclusive_jobs
"""
Filter 3: find the jobs that share a node
The idea is to add a new column to `df_total_power`
with a list of jobs that run on each node at each timestamp.
Then we filter jobs that appear sharing with another job
Version for multi-node jobs
"""
def filter3_1_multi(df_jobs_multi, df_total_power):
df_jobs_multi["node"] = [ast.literal_eval(x) for x in df_jobs_multi['nodes']]
#use this for smaller inputs
#TEST_SAMPLE_SIZE = 1000
TEST_SAMPLE_SIZE = len(df_jobs_multi)
sample = df_jobs_multi[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True)
#sample = df_jobs_single
group = df_total_power.groupby(by="node")
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that x[0] is the node list, x[1] is start_time, and x[2] is end_time
#each element of the list is a subseries of the total_power of a certain mode
#with "1" at the timestamps where the "job_id" ran
# i replace the "0"s (where the timestamp is not betweenx[1], x[2]) with nans just to remove these values with dropna.
result = [[[group.get_group(y)["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str) for y in x[0]], x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()]
#print(type(group.get_group(512)))
#group.get_group(x[0])
#result[0] = Series with timestamps with True where the job run, and False otherwise
#result[1] = job id
#result.values.to_list()
# now i replace the "1"s with the "job_id"
#each element of the list is a subseries of the total_power of a certain mode
#with the "job_id" value at the timestamps where the "job_id" ran
result2 = [pd.concat([y.replace("1.0", str(x[1])) for y in x[0]]) for x in result]
#finally i concatenate each of the series by index
# while joining the values where indices overlap
# i hope the indices here are consistent with the indices in df_total_power
#concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True)
concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids")
#print(concatenated_series)
# use the below code for the full run
#joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="left")
#use the below code for a rest run
joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner")
print(joined_total_power)
return joined_total_power
"""
Save results to csv
"""
def save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile):
metric = metricfile.split("/")[-2]
jobfile_out = jobfile_single.rstrip(metric+"_filter12_singlenode.csv")
metricfile_out = metricfile.rstrip("a_0.parquet")
df_exclusive_jobs_single.to_csv(jobfile_out+metric+"_filter123_singlenode.csv", index=False)
df_exclusive_jobs_multi.to_csv(jobfile_out+metric+"_filter123_multinode.csv", index=False)
df_total_power_exclusive_single.to_csv(metricfile_out+"a_0_filter123_singlenode.csv", index=False)
df_total_power_exclusive_multi.to_csv(metricfile_out+"a_0_filter123_multinode.csv", index=False)
"""
Run workflow
"""
def run_workflow(metricfile, jobfile_single, jobfile_multi):
df_jobs_single, df_jobs_multi, df_total_power = read_data(jobfile_single, jobfile_multi, metricfile)
#Single-node jobs workflow
joined_total_power = filter3_1_single(df_jobs_single, df_total_power)
exclusive_job_ids = filter3_2(joined_total_power)
#df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power)
df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_3_par(df_jobs_single, exclusive_job_ids, joined_total_power)
print(df_total_power_exclusive_single)
print(df_exclusive_jobs_single)
###############################
#Multi-node jobs workflow
joined_total_power = filter3_1_multi(df_jobs_multi, df_total_power)
exclusive_job_ids = filter3_2(joined_total_power)
#df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power)
df_total_power_exclusive_multi, df_exclusive_jobs_multi = filter3_3_par(df_jobs_multi, exclusive_job_ids, joined_total_power)
print(df_total_power_exclusive_multi)
print(df_exclusive_jobs_multi)
save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile)
###############################
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles')
p.add_argument("--metricfile", "-m", type=str, required=True,
help="Metric file")
p.add_argument("--jobfilesingle", "-js", type=str, required=True,
help="Job table file for single-node jobs (output from process_marconi_jobs_1.py)")
p.add_argument("--jobfilemulti", "-jm", type=str, required=True,
help="Job table file for multi-node jobs (output from process_marconi_jobs_1.py)")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python process_marconi_jobs.py --help')
sys.exit(1)
run_workflow(args.metricfile, args.jobfilesingle, args.jobfilemulti)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
import pandas as pd
import subprocess
import sys
import argparse
import os
import functools
from multiprocessing import Pool
"""
User power prediction subroutine
"""
def run_prediction_user(user_id, jobfile=None, outdir=None):
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
command = ['python', 'predict_jobs_power.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Run Workflow
"""
def run_workflow(jobfile, outdir):
# Load the dataframe with "user_id" column
df = pd.read_csv(jobfile)
# Iterate over each user_id and run predict_jobs_power_svr.py
for user_id in df['user_id'].drop_duplicates():
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
# Skip user if prediction was already performed
if os.path.isfile(outfile) == True:
continue
command = ['python', 'predict_jobs_power_allmethods_max.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outdir", "-o", type=str, required=True,
help="Directory of the output files")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python run_prediction_per_user --help')
sys.exit(1)
run_workflow(args.jobfile, args.outdir)
\ No newline at end of file
import pandas as pd
import subprocess
import sys
import argparse
import os
import functools
from multiprocessing import Pool
"""
User power prediction subroutine
"""
def run_prediction_user(user_id, jobfile=None, outdir=None):
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
command = ['python', 'predict_jobs_power.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Run Workflow
"""
def run_workflow(jobfile, outdir):
# Load the dataframe with "user_id" column
df = pd.read_csv(jobfile)
# Iterate over each user_id and run predict_jobs_power_svr.py
for user_id in df['user_id'].drop_duplicates():
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
# Skip user if prediction was already performed
if os.path.isfile(outfile) == True:
continue
command = ['python', 'predict_jobs_power_allmethods_mean.py', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outdir", "-o", type=str, required=True,
help="Directory of the output files")
return(p.parse_args())
if __name__ == '__main__':
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
try:
args = read_cli()
except:
print('Try $python run_prediction_per_user --help')
sys.exit(1)
run_workflow(args.jobfile, args.outdir)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment