Skip to content
Snippets Groups Projects
m100_pred_preprocess_1.py 4.96 KiB
#!/usr/bin/env python3
import sys
import argparse
import ast
import pandas as pd
import numpy as np

"""
Read Data
"""
def read_data(jobfile, metricfile):
    df_jobs = pd.read_parquet(jobfile)
    df_power = pd.read_parquet(metricfile)
    ## I call here df_power, but in reality it can be any metric    
    return df_jobs, df_power


"""
Convert some string values to numeric
Convert timestamps to seconds for job runtime
"""
def preprocess_data(df_jobs, df_power):    
    df_power['node'] = pd.to_numeric(df_power['node'])
    df_power['value'] = pd.to_numeric(df_power['value'])
    df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's')
    print("Total number of jobs: ", len(df_jobs))
    return df_jobs, df_power


"""
Filter 1: remove jobs that take less than a minute
"""
def filter1(df_jobs):
    SECONDS_ONE_MINUTE = 60.0

    df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE]
    print("Number of jobs after filter 1: ", str(len(df_jobs_f1)))
    return df_jobs_f1


"""
Filter 2: remove jobs with no energy profile
Method for single-node jobs
"""
def filter2_single(df_jobs, df_power):
    df_jobs_f1_single = df_jobs[df_jobs["num_nodes"]==1]

    #removing malformed "nodes"
    df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True)
    df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']]

    #use this for smaller inputs
    #sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True)

    sample = df_jobs_f1_single

    group = df_power.groupby(by="node")

    available_nodes = group.groups.keys()    
 
    #for debugging
    #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]

    #i know that 0 is node, 1 is start_time, and 2 is end_time
    result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) if x[0] in available_nodes else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]
    
    sample["has_profile"] = result
    df_jobs_f12_single = sample[sample["has_profile"]==True]

    print("Number of jobs after filter 2 - single: ", str(len(df_jobs_f12_single)))
    return df_jobs_f12_single


"""
Filter 2: remove jobs with no energy profile
Method for multi-node jobs
"""
def filter2_multi(df_jobs, df_power):    
    df_jobs_f1_multi = df_jobs[df_jobs["num_nodes"] > 1].copy().reset_index(drop=True)

    #removing malformed "nodes"   
    df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']]
    
    #use this for smaller inputs
    #sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True)

    sample = df_jobs_f1_multi

    group = df_power.groupby(by="node")

    available_nodes = set(group.groups.keys())

    #for debugging
    #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]

    #i know that 0 is node, 1 is start_time, and 2 is end_time
    result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) if set(x[0]).issubset(available_nodes) else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]

    sample["has_profile"] = result
    df_jobs_f12_multi = sample[sample["has_profile"]==True]
    print("Number of jobs after filter 2 - multi: ", str(len(df_jobs_f12_multi)))
    return df_jobs_f12_multi


"""
Save intermediate results to csv
"""
def save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile):       
    jobfile_out = jobfile.rstrip("jobs.parquet")      
    df_jobs_single.to_csv(jobfile_out+"_filter12_singlenode.csv", index=False)
    df_jobs_multi.to_csv(jobfile_out+"_filter12_multinode.csv", index=False)

"""
Run workflow
"""
def run_workflow(metricfile, jobfile):
    df_jobs, df_power = read_data(jobfile=jobfile, metricfile=metricfile)
    df_jobs, df_power = preprocess_data(df_jobs=df_jobs, df_power=df_power)
    df_jobs = filter1(df_jobs)
    df_jobs_single = filter2_single(df_jobs, df_power)
    df_jobs_multi = filter2_multi(df_jobs, df_power)
    save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile)    

"""
Read Command line interface
"""
def read_cli():
    # Make parser object
    p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles')    
    
    p.add_argument("--metricfile", "-m", type=str, required=True,
                   help="Metric file")
    p.add_argument("--jobfile", "-j", type=str, required=True,
                   help="Job table file")
    
    return(p.parse_args())

if __name__ == '__main__':
    
    if sys.version_info<(3,5,0):
        sys.stderr.write("You need python 3.5 or later to run this script\n")
        sys.exit(1)
        
    try:
        args = read_cli()        
    except:
        print('Try $python process_marconi_jobs.py --help')
        sys.exit(1)

    run_workflow(args.metricfile, args.jobfile)