diff --git a/expetator/bundle.py b/expetator/bundle.py new file mode 100644 index 0000000000000000000000000000000000000000..d44c038e92919b3eb35d1109b6d8899384fa8bfe --- /dev/null +++ b/expetator/bundle.py @@ -0,0 +1,58 @@ +import pandas as pd +import numpy as np +import json + +def init_bundle(bundlename): + 'Reads an experiment file' + if bundlename.endswith('.zip'): + zip_fid = zipfile.ZipFile(bundlename, 'r') + bundlename = bundlename[:-4] + experiments = pd.read_csv(zip_fid.open(bundlename), sep=' ') + else: + zip_fid = None + experiments = pd.read_csv(bundlename, sep=' ') + + experiments['basename'] = bundlename + return experiments, zip_fid + +def merge_timeseries_blocks(references, additions, prefix = 'add_', key='#timestamp'): + return [ + [ + merge_timeseries(references[experiment_id][host_id], + additions[experiment_id][host_id],prefix, key) + for host_id in range(len(references[experiment_id])) + ] + for experiment_id in range(len(references)) + ] + +def merge_timeseries(reference, addition, prefix = 'add_', key='#timestamp'): + reference = reference.copy() + + delta = reference[key][0] - addition[key][0] + intermediate_timestamps = reference[key]-delta + + for column in addition: + if column != key: + new_name = prefix+addition[column].name + + + data = np.interp(intermediate_timestamps, addition[key], addition[column]) + + reference[new_name] =data + + return reference + + delta_ref = reference[key][0] + span_ref = max(reference[key]) - reference[key][0] + delta_add = addition[key][0] + span_add = max(addition[key]) - addition[key][0] + + print(delta_ref, span_ref) + print(delta_add, span_add) + +def normy(focus): + df = focus.loc[:, focus.columns != '#timestamp'] + norm_focus=(df-df.min())/(df.max()-df.min()) + norm_focus['#timestamp'] = focus['#timestamp'] + return norm_focus + diff --git a/expetator/monitoring_csv.py b/expetator/monitoring_csv.py new file mode 100644 index 0000000000000000000000000000000000000000..09b72edf9b6953efd06c61cdf9645914b3aba06e --- /dev/null +++ b/expetator/monitoring_csv.py @@ -0,0 +1,22 @@ +import pandas as pd + +def read_host_csv(prefix, hostname, startTime, basename, fullname, archive_fid=None): + fullpath= '%s_%s/%s_%s_%s' % (basename, prefix, hostname, fullname, startTime) + if archive_fid is None: + with open(fullpath) as file_id: + data = pd.read_csv(fullpath,sep=' ', skipinitialspace=True) + else: + with archive_fid.open(fullpath) as file_id: + data = pd.read_csv(file_id,sep=' ') + data = data.dropna(axis='columns') + return data + +def read_run_csv(prefix, hostname, startTime, basename, fullname, hostlist, archive_fid=None): + return [read_host_csv(prefix, host, startTime, basename, fullname, archive_fid) for host in hostlist.split(';')] + +def read_bundle_csv(prefix, bundle, archive_fid=None): + 'Reads mojitO/S-like files associated to a bundle' + csv_data = [read_run_csv(prefix, row.hostname, row.startTime, row.basename, row.fullname, row.hostlist, archive_fid) + for index, row in bundle.iterrows()] + return csv_data + diff --git a/expetator/monitoring_list.py b/expetator/monitoring_list.py new file mode 100644 index 0000000000000000000000000000000000000000..739b817eaac609b256ddb7d6e61c68908c5d7ee3 --- /dev/null +++ b/expetator/monitoring_list.py @@ -0,0 +1,34 @@ +import json +import pandas as pd + +## Power +def read_run_list(prefix, hostname, startTime, basename, fullname, hostlist=None, archive_fid=None): + fullpath= '%s_%s/%s_%s_%s' % (basename, prefix, hostname, fullname, startTime) + result = [] + + try: + if archive_fid is None: + with open(fullpath) as file_id: + raw_data = json.loads(file_id.read()) + else: + with archive_fid.open(fullpath) as file_id: + raw_data = json.loads(file_id.read()) + + data = {host:(timestamp, values) for (host, timestamp, values) in raw_data} + + for host in hostlist.split(';'): + name, _ = host.split('.', maxsplit=1) + df = pd.DataFrame(data[name]).transpose() + df.columns = ["#timestamp", prefix] + result.append(df) + except: + pass + + return result + +def read_bundle_list(prefix, bundle, archive_fid=None): + 'Reads the power files associated to a bundle' + list_data = [read_run_list(prefix, row.hostname, row.startTime, row.basename, row.fullname, row.hostlist, archive_fid) + for index, row in bundle.iterrows()] + return list_data + diff --git a/expetator/watermark.py b/expetator/watermark.py new file mode 100644 index 0000000000000000000000000000000000000000..d8ab7743a4a7221e8c2b885b0477a954f5dbd44d --- /dev/null +++ b/expetator/watermark.py @@ -0,0 +1,131 @@ +#TODO: put watermark detection at delta=0 if corr<.6 + +# For watermark detection +from sklearn import decomposition +import numpy as np + + +# for demo (to remove ?) +import matplotlib.pyplot as plt + + +# Tools for watermark detection +def df_to_vector(df): + try: + tmp = df.drop('#timestamp', axis=1) + except: + tmp = df + pca = decomposition.PCA(n_components=1) + pca.fit(tmp) + x = pca.transform(tmp) + return x.flatten() + +def get_wm_ref(total_time=30, nb_steps=2, freq=10): + low = int(total_time/(2*nb_steps+1)) + high = (total_time-low*(nb_steps+1))/nb_steps + + mark = np.array([]) + for _ in range(nb_steps): + mark=np.append(mark, [-1]*int(low*freq)) + mark=np.append(mark, [1]*int(high*freq)) + mark=np.append(mark, [-1]*int(low*freq)) + return mark + + +def get_shift(dataframe, frequency=10, duration=30, backward=False): + # Takes the best shift using a 10x frequency + precision = 4 + timeframe = np.arange(0, duration, 1/(frequency*precision)) + + # Gets the watermark on the timeframe + watermark = get_wm_ref(freq=frequency*precision) + + # Gets the data regularly interpolated in the right window + ts = dataframe['#timestamp'].to_numpy() + if backward: + pos_in_data = np.searchsorted(ts, max(ts)-duration ) + tmp_ts = ts[pos_in_data:] + tmp_data = df_to_vector(dataframe[pos_in_data:]) + else: + pos_in_data = np.searchsorted(ts, min(ts)+duration ) + tmp_ts = ts[:pos_in_data+1] + tmp_data = df_to_vector(dataframe[:pos_in_data+1]) + + data = np.interp(timeframe+min(tmp_ts), tmp_ts, tmp_data) + + deltamax = len(timeframe)//(2*precision) + current = None + res = None + for delta in range(-deltamax, deltamax+1): + if delta <= 0: + coeff = np.corrcoef(data[:len(timeframe)+delta] , + watermark[-delta:])[0][1] + else: + coeff = np.corrcoef(data[delta:] , + watermark[:len(timeframe)-delta])[0][1] + + + if current is None or abs(coeff) > current: + current=abs(coeff) + if backward: + res = np.searchsorted(ts, max(ts)-duration+delta/(precision*frequency) ), coeff + else: + res = np.searchsorted(ts, min(ts)+duration+delta/(precision*frequency) ), coeff + + #print(delta, res) + + if current < .7: + res = pos_in_data, 0 + + return res + +def remove_watermark(dataframe, frequency=10, duration=30): + if len(dataframe) == 0: + return dataframe + delta_deb, conf_deb = get_shift(dataframe, frequency, duration) + delta_fin, conf_fin = get_shift(dataframe, frequency, duration, backward=True) + return dataframe[delta_deb:delta_fin+1].reset_index(drop=True) + +def remove_watermark_blocks(block, frequency=10, duration=30): + return [ + [ + remove_watermark(block[experiment][host], frequency, duration) + for host in range(len(block[experiment])) + ] + for experiment in range(len(block)) + ] + +## Tool for virtualisation + +def demo_watermark_detection(focus, freq): + df = focus.loc[:, focus.columns != '#timestamp'] + norm_focus=(df-df.min())/(df.max()-df.min()) + norm_focus['#timestamp'] = focus['#timestamp'] + fig, ax = plt.subplots() + norm_focus.plot(x='#timestamp', figsize=(10,6), ax=ax) + + delta_deb, conf_deb = get_shift(focus, freq) + delta_fin, conf_fin = get_shift(focus, freq, backward=True) + + # the actual data + norm_focus[delta_deb:delta_fin+1].plot(x='#timestamp', linewidth=4, ax=ax) + + # common part of the watermark + watermark_reference = (get_wm_ref(freq=freq)+1)/2 + nb_watermark = len(watermark_reference) + + #start + w_start = min(focus[delta_deb:delta_fin+1]['#timestamp']) + plt.plot(np.linspace(w_start-30, w_start-1/freq,len(watermark_reference)), + watermark_reference) + plt.vlines(w_start, 0, 1) + plt.vlines(w_start-1/freq, 0, 1) + + #end + w_end = max(focus[delta_deb:delta_fin+1]['#timestamp']) + plt.plot(np.linspace(w_end + 1/freq, w_end+30,len(watermark_reference)), + watermark_reference) + plt.vlines(w_end, 0, 1) + plt.vlines(w_end+1/freq, 0, 1) + + print(w_end-w_start)