diff --git a/paradise/main.py b/paradise/main.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..b7eaf01d49f118fa4326d033f6dfbc76c736b4ed 100755 --- a/paradise/main.py +++ b/paradise/main.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python +""" +This module is the entrypoint for the generation of time series. +""" +import json +import os + +from argparse import ArgumentParser +from datetime import datetime + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +from generator import DatasetGenerator +from split import BaseSplitter +from trainers import BaseTrainer +from results import ResultExtractor + +plt.rcParams["figure.figsize"] = (20, 10) + + +if __name__ == "__main__": + # ================================================================================================================= + # Fetch arguments from CLI + # ================================================================================================================= + # Create the parser + parser = ArgumentParser(prog="Time Series Generator", description="Create time series.") + + # Add arguments + parser.add_argument( + "task", + help="Either generate data, test an algorithm, compute the results of an execution or do everything.", + choices=["generate", "train", "results", "all"] + ) + parser.add_argument( + "-c", "--config", + help="Path to config file(s) to use to generate data.", + default=["config/2d-mix.json"], + nargs="+" + ) + parser.add_argument("-a", "--algorithms", help="Which algorithm to train.", default=["kmeans"], nargs="+") + parser.add_argument( + "-m", "--cluster-method", + help="Which cluster algorithm to use.", + default="HDBSCAN", + dest="method" + ) + parser.add_argument("-i", "--input", help="Input directory. Only to be used when no data will be generated") + parser.add_argument("-o", "--output", help="Output directory") + parser.add_argument( + "-s", "--split", + help="Automatically split dataset into subsystems and treat them once at a time.", + action="store_true" + ) + parser.add_argument( + "-optim", + help="Optimize the different hyper parameters according to what's wirtten in the algorithm_params.json file", + action="store_true" + ) + parser.add_argument( + "-d", "--docker", + help="Use Docker containers directly to run the algorithm. Allow to run algorithms without cloning repo", + action="store_true" + ) + + # Load args + args = parser.parse_args() + + # Prepare output directory + if args.output is None: + OUTPUT_DIR = f"output/{datetime.now().strftime('%Y-%m-%d.%H-%M-%S')}" + else: + OUTPUT_DIR = f"output/{args.output}" + + # Prepare input directory if needed + if args.task not in ["generate", "all"] and args.input is None: + raise ValueError("Impossible to skip data generation and not give an input directory") + + if args.task not in ["generate", "all"]: + INPUT_DIR = f"output/{args.input}" + else: + INPUT_DIR = OUTPUT_DIR + + # ================================================================================================================= + # Generate Data + # ================================================================================================================= + # Load config file + for config_file in args.config: + # Compute config name + config_name = config_file.split("/")[-1][:-5] if "." in config_file else config_file.split("/")[-1] + + if args.task in ["generate", "all"]: + # Create output dir + os.makedirs(f"{OUTPUT_DIR}/{config_name}", exist_ok=True) + + # Read config file + with open(config_file, "r", encoding="utf-8") as f: + config = json.load(f) + + # Data generation + generator = DatasetGenerator(config) + + dataset, train_dataset, labels, variables_labels = generator.generate() + subsystems, splitted_data, splitted_train, splitted_labels = generator.get_splitted_data() + + # Save data to disk + # Prepare the data + columns = list(range(0, dataset.shape[0])) + indices = list(range(0, dataset.shape[1])) + t_indices = list(range(0, train_dataset.shape[1])) + df_test = pd.DataFrame(data=dataset.T, index=indices, columns=columns) + df_test["is_anomaly"] = labels + df_train = pd.DataFrame(data=train_dataset.T, index=t_indices, columns=columns) + df_train["is_anomaly"] = np.zeros(train_dataset.shape[1]) + + df_test.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset.csv", index_label="Timestamp") + df_train.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_train.csv", index_label="Timestamp") + pd.DataFrame(data=labels).to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_labels.csv", index=False) + pd.DataFrame(data=variables_labels.T)\ + .to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_variables_labels.csv", index=False) + + # Plot data and save it to disk + for dimension in dataset: + plt.plot(dimension) + plt.savefig(f"{OUTPUT_DIR}/{config_name}/dataset.png") + plt.clf() + + for dimension in train_dataset: + plt.plot(dimension) + plt.savefig(f"{OUTPUT_DIR}/{config_name}/train_dataset.png") + plt.clf() + + # Handle splitted data + for idx, (data, train, lab) in enumerate(zip(splitted_data, splitted_train, splitted_labels)): + columns = list(range(0, data.shape[0])) + df_test_s = pd.DataFrame(data=data.T, index=indices, columns=columns) + df_test_s["is_anomaly"] = lab + df_train_s = pd.DataFrame(data=train.T, index=t_indices, columns=columns) + df_train_s["is_anomaly"] = np.zeros(train.shape[1]) + + df_test_s.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_{idx}.csv", index_label="Timestamp") + df_train_s.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_{idx}_train.csv", index_label="Timestamp") + pd.DataFrame(data=lab).to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_{idx}_labels.csv", index=False) + + with open(f"{OUTPUT_DIR}/{config_name}/subsystems.txt", "w", encoding="utf-8") as f: + clusters = np.zeros(dataset.shape[0]) + for idx, cluster in enumerate(subsystems): + for member in cluster: + clusters[member] = idx + f.write(json.dumps(clusters.tolist())) + + + # ================================================================================================================= + # Split data + # ================================================================================================================= + + if args.split and args.task in ["train", "all"]: + splitter = BaseSplitter(f"{INPUT_DIR}/{config_name}") + splitter.split_data(method=args.method) + + # ================================================================================================================= + # Train algorithm + # ================================================================================================================= + if args.task in ["train", "all"]: + trainers = [] + + with open("algorithm_params.json", "r", encoding="utf-8") as f: + algo_params = json.load(f) + + with open(f"{INPUT_DIR}/{config_name}/time.csv", "a", encoding="utf-8") as f: + f.write("Algorithm,Dataset,Step,Duration\n") + + for algo in args.algorithms: + params = algo_params[algo] + train = params.pop("training") + trainer = BaseTrainer(f"{INPUT_DIR}/{config_name}", algo, train, **params) + trainer.start(optim=args.optim) + trainers.append(trainer) + + # ================================================================================================================= + # Compute and plot results + # ================================================================================================================= + if args.task in ["results", "all"]: + results = ResultExtractor(INPUT_DIR).fetch_results().compute_results() + results.to_csv(f"{INPUT_DIR}/results.csv")