-
Pierre LOTTE authoredPierre LOTTE authored
roc.py 4.31 KiB
"""
This module provides the interface needed to compute ROC scores.
"""
import json
import subprocess
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from warnings import simplefilter
from sklearn.metrics import roc_auc_score, precision_recall_curve
from sklearn.preprocessing import Normalizer
from .base import BaseResults
simplefilter("ignore", category=RuntimeWarning)
class ROCResults(BaseResults):
"""
This class will compute the ROC metric.
"""
def compute(self, auto_split=False):
def __exec(cmd) -> str:
"""
Execute a shell command and process its output as expected.
"""
return subprocess.check_output(cmd, shell=True).decode("utf-8").split()
for config in self.configs:
config_name = config.split("/")[-1][:-5] if "." in config else config.split("/")[-1]
labels = pd.read_csv(f"{self.path}/{config_name}/dataset_labels.csv")
# Compute the score for the full_dataset
for algo in self.algos:
y_pred_path = f"{self.path}/{config_name}/results_{algo}/anomaly_scores_dataset_{algo}.ts"
roc, f1 = self.__compute_score(labels, y_pred_path)
if algo in self.result["roc"]:
self.result[algo][config_name] = {"classic": {"roc": round(roc, 4), "f1": round(f1, 4)}}
else:
self.result[algo] = {config_name: {"classic": {"roc": round(roc, 4), "f1": round(f1, 4)}}}
# Compute results for automatically splitted dataset
if auto_split:
files = __exec(f"find -L {self.path}/{config_name}/results_{algo} -regex '^.*dataset[_0-9]+_auto_split_{algo}.ts'")
roc, f1 = self.__compute_score(labels, files, local=True)
self.result[algo][f"{config_name}"]["auto_split"] = {"roc": round(roc, 4), "f1": round(f1, 4)}
# Compute results for splitted dataset
files = __exec(f"find -L {self.path}/{config_name}/results_{algo} -regex '^.*dataset[_0-9]+_{algo}.ts'")
roc, f1 = self.__compute_score(labels, files, local=True)
self.result[algo][f"{config_name}"]["split"] = {"roc": round(roc, 4), "f1": round(f1, 4)}
print(json.dumps(self.result))
def __vote_for_score(self, scores, length):
"""
Compute the score for each point of the dataset instead of a per window basis.
"""
scores = np.nan_to_num(scores)
results = np.zeros(length)
w_size = length - len(scores) + 1
for idx in range(length):
start = idx - w_size if idx - w_size >= 0 else 0
end = idx + w_size if idx + w_size < length else length
results[idx] = np.mean(scores[start:end])
return results
def __compute_score(self, labels, y_pred_path, local=False):
"""
This function computes the roc and F1 score of the given predictions
"""
result = np.zeros(len(labels))
# If local is set to true, it means that we have a list of path for local scores.
# We must first retrieve all the scores and aggregate them.
if local:
norm = Normalizer()
if len(y_pred_path) == 0:
return 0, 0
for path in y_pred_path:
y_pred = np.loadtxt(path)
y_pred = np.nan_to_num(y_pred)
if len(y_pred != len(labels)):
y_pred = self.__vote_for_score(y_pred, len(labels))
y_pred = norm.fit_transform(y_pred.reshape(1,-1)).reshape(-1)
result = np.maximum(result, y_pred)
# Otherwise, we simply have one score file, we must read it and compute the score
# for each instant.
else:
y_pred = np.loadtxt(y_pred_path)
result = np.nan_to_num(y_pred)
if len(y_pred) != len(labels):
result = self.__vote_for_score(y_pred, len(labels))
# Once the correct anomaly scores have been computed, we can compute the metrics
roc = roc_auc_score(labels, result)
prec, rec, _ = precision_recall_curve(labels, result)
fscore = (2 * prec * rec) / (prec + rec)
fscore = np.nan_to_num(fscore)
idx = np.argmax(fscore)
return roc, fscore[idx]