Skip to content
Snippets Groups Projects
roc.py 4.31 KiB
"""
This module provides the interface needed to compute ROC scores.
"""
import json
import subprocess

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from warnings import simplefilter

from sklearn.metrics import roc_auc_score, precision_recall_curve
from sklearn.preprocessing import Normalizer

from .base import BaseResults

simplefilter("ignore", category=RuntimeWarning)

class ROCResults(BaseResults):
    """
    This class will compute the ROC metric.
    """
    def compute(self, auto_split=False):
        def __exec(cmd) -> str:
            """
            Execute a shell command and process its output as expected.
            """
            return subprocess.check_output(cmd, shell=True).decode("utf-8").split()


        for config in self.configs:
            config_name = config.split("/")[-1][:-5] if "." in config else config.split("/")[-1]
            labels = pd.read_csv(f"{self.path}/{config_name}/dataset_labels.csv")

            # Compute the score for the full_dataset
            for algo in self.algos:
                y_pred_path = f"{self.path}/{config_name}/results_{algo}/anomaly_scores_dataset_{algo}.ts"
                roc, f1 = self.__compute_score(labels, y_pred_path)

                if algo in self.result["roc"]:
                    self.result[algo][config_name] = {"classic": {"roc": round(roc, 4), "f1": round(f1, 4)}}
                else:
                    self.result[algo] = {config_name: {"classic": {"roc": round(roc, 4), "f1": round(f1, 4)}}}

                # Compute results for automatically splitted dataset
                if auto_split:
                    files = __exec(f"find -L {self.path}/{config_name}/results_{algo} -regex '^.*dataset[_0-9]+_auto_split_{algo}.ts'")
                    roc, f1 = self.__compute_score(labels, files, local=True)
                    self.result[algo][f"{config_name}"]["auto_split"] = {"roc": round(roc, 4), "f1": round(f1, 4)}

                # Compute results for splitted dataset
                files = __exec(f"find -L {self.path}/{config_name}/results_{algo} -regex '^.*dataset[_0-9]+_{algo}.ts'")

                roc, f1 = self.__compute_score(labels, files, local=True)
                self.result[algo][f"{config_name}"]["split"] = {"roc": round(roc, 4), "f1": round(f1, 4)}

        print(json.dumps(self.result))

    def __vote_for_score(self, scores, length):
        """
        Compute the score for each point of the dataset instead of a per window basis.
        """
        scores = np.nan_to_num(scores)
        results = np.zeros(length)
        w_size = length - len(scores) + 1

        for idx in range(length):
            start = idx - w_size if idx - w_size >= 0 else 0
            end = idx + w_size if idx + w_size < length else length

            results[idx] = np.mean(scores[start:end])

        return results

    def __compute_score(self, labels, y_pred_path, local=False):
        """
        This function computes the roc and F1 score of the given predictions
        """
        result = np.zeros(len(labels))

        # If local is set to true, it means that we have a list of path for local scores.
        # We must first retrieve all the scores and aggregate them.
        if local:
            norm = Normalizer()

            if len(y_pred_path) == 0:
                return 0, 0

            for path in y_pred_path:
                y_pred = np.loadtxt(path)
                y_pred = np.nan_to_num(y_pred)

                if len(y_pred != len(labels)):
                    y_pred = self.__vote_for_score(y_pred, len(labels))

                y_pred = norm.fit_transform(y_pred.reshape(1,-1)).reshape(-1)
                result = np.maximum(result, y_pred)
        # Otherwise, we simply have one score file, we must read it and compute the score
        # for each instant.
        else:
            y_pred = np.loadtxt(y_pred_path)
            result = np.nan_to_num(y_pred)

            if len(y_pred) != len(labels):
                result = self.__vote_for_score(y_pred, len(labels))

        # Once the correct anomaly scores have been computed, we can compute the metrics
        roc = roc_auc_score(labels, result)
        prec, rec, _ = precision_recall_curve(labels, result)

        fscore = (2 * prec * rec) / (prec + rec)
        fscore = np.nan_to_num(fscore)

        idx = np.argmax(fscore)

        return roc, fscore[idx]