Skip to content
Snippets Groups Projects
Commit 29380c2e authored by Pierre LOTTE's avatar Pierre LOTTE
Browse files

Generation done, trainers next

parent d1d3ca00
No related branches found
No related tags found
No related merge requests found
/output
**/__pycache__
{
"dimensions": [
{
"kind": "OSC",
"equation": {
"function": "sin",
"amplitude": 2.0,
"frequency": 1.0
},
"noise": {
"mean": 0.0,
"std": 0.1
},
"anomalies": [
{
"kind": "NOISE",
"std": 0.1,
"mean": 0.0,
"length": 5,
"position": "middle"
}
]
},
{
"kind": "CORRELATION",
"dimension": 0,
"equation": {
"step": 0.05,
"sign": -1
},
"noise": {
"mean": 0.0,
"std": 0.1
},
"anomalies": [
{
"kind": "NOISE",
"std": 0.1,
"mean": 0.0,
"length": 5,
"position": "end"
}
]
}
],
"length": 1000
}
......@@ -6,3 +6,6 @@ from .noise import NoiseAnomaly
ANOMALY_CLASSES = {
"NOISE": NoiseAnomaly,
}
PREPROCESS_ANOMALIES = []
POSTPROCESS_ANOMALIES = ["NOISE"]
......@@ -16,6 +16,7 @@ class BaseAnomaly():
"""
self.start = 0
self.end = 0
self.labels = np.zeros(data.shape[1], dtype=np.int32)
self.length = configuration["length"]
self.params = configuration
self.data = data
......@@ -31,4 +32,20 @@ class BaseAnomaly():
"""
This method will find the starting and ending indices of the anomaly to inject.
"""
return 0, 100
total_length = self.data.shape[1]
split = total_length // 3
if self.params["position"] == "start":
self.start = np.random.randint(0, split)
elif self.params["position"] == "middle":
self.start = np.random.randint(split, 2*split)
elif self.params["position"] == "end":
self.start = np.random.randint(2*split, total_length-(self.length+1))
else:
raise ValueError(f"Unknown anomaly position {self.params['position']}")
self.end = self.start + self.length
self.labels[self.start:self.end] = 1
return self.start, self.end
......@@ -11,7 +11,8 @@ class NoiseAnomaly(BaseAnomaly):
"""
def inject(self) -> np.array:
start, end = self.find_anomaly_index()
noise = np.random.normal(self.params["mean"], self.params["std"], self.length)
self.data[self.idx, start:end] += noise
return self.data
return self.data, self.labels
"""
This module provides the main class needed to generate time series
"""
from typing import Tuple
import numpy as np
from generator.dimension import DIMENSION_CLASSES
......@@ -13,20 +15,36 @@ class DatasetGenerator():
def __init__(self, config) -> "DatasetGenerator":
self.dimensions= config["dimensions"]
self.length = config["length"]
self.training = config["training"]
# self.training = config["training"]
def generate(self):
def generate(self) -> Tuple[np.array, np.array, np.array]:
"""
Start generation of data according to the configuration.
"""
dataset = np.zeros(len(self.dimensions), self.length)
training_dataset = np.zeros(len(self.dimensions), self.length * 3)
dataset = np.zeros((len(self.dimensions), self.length))
training_dataset = np.zeros((len(self.dimensions), self.length * 3))
dimensions = []
# Create dimensions
for idx, dimension in enumerate(self.dimensions):
dataset, training_dataset, new_labels = (
DIMENSION_CLASSES[dimension["kind"]](dimension, dataset, training_dataset, idx).generate()
)
labels = np.bitwise_or.reduce((labels, new_labels), axis=0)
# Create a dimension object
dimensions.append(DIMENSION_CLASSES[dimension["kind"]](dimension, dataset, training_dataset, idx))
# Some anomalies need to be injected before data generation
dimensions[idx].inject_anomalies(step="pre")
# Generate data
dataset, training_dataset = dimensions[idx].generate()
# Other anomalies might need to be added after data generation
for dimension in dimensions:
dimension.inject_anomalies(step="post")
# Compute labels
labels = np.zeros(self.length, dtype=np.int32)
for dimension in dimensions:
labels = np.bitwise_or.reduce(np.stack((dimension.get_labels(), labels), dtype=np.int32), axis=0)
return dataset, training_dataset, labels
def __str__(self) -> str:
return f"Dataset(dimensions={len(self.dimensions)}, length={self.length}, training={self.training})"
return f"Dataset(dimensions={len(self.dimensions)}, length={self.length})"
......@@ -16,9 +16,6 @@ class AffineDimension(BaseDimension):
"""
This function generates data according to the given configuration
"""
# Add anomalies
self.inject_anomalies()
# Compute values for test data
x = np.linspace(0, self.length, self.length)
self.data[self.idx] += x * self.terms["a"] + self.terms["b"]
......@@ -27,4 +24,4 @@ class AffineDimension(BaseDimension):
x = np.linspace(0, self.length, self.train_length)
self.train_data[self.idx] += x * self.terms["a"] + self.terms["b"]
return self.data, self.train_data, self.labels
return self.data, self.train_data
......@@ -5,7 +5,7 @@ from typing import Tuple
import numpy as np
from generator.anomaly import ANOMALY_CLASSES
from generator.anomaly import ANOMALY_CLASSES, PREPROCESS_ANOMALIES, POSTPROCESS_ANOMALIES
class BaseDimension():
......@@ -21,6 +21,7 @@ class BaseDimension():
are related to one another. Will not be used if not needed.
:params idx: (int) Which dimension is actually targeted by this generator instance.
"""
self.params = configuration
self.terms = configuration["equation"]
self.noise = configuration["noise"]
self.anomalies = configuration["anomalies"]
......@@ -28,7 +29,7 @@ class BaseDimension():
self.data = data
self.train_data = train_data
self.train_length = train_data.shape[1]
self.labels = np.zeros(self.length)
self.labels = np.zeros(self.length, dtype=np.int32)
self.idx = idx
def generate(self) -> Tuple[np.array, np.array, np.array]:
......@@ -49,14 +50,26 @@ class BaseDimension():
noise = np.random.normal(self.noise["mean"], self.noise["std"], self.length)
return self.data[self.idx] + noise
def inject_anomalies(self) -> np.array:
def get_labels(self) -> np.array:
"""
Gives access to labels for this particular dimension
"""
return self.labels
def inject_anomalies(self, step:str="post") -> np.array:
"""
This method is in charge of injecting anomalies into the data. It can either be called before
or after the data generation depending on the needs of the data generation.
"""
for anomaly in self.anomalies:
def __inject():
data, labels = ANOMALY_CLASSES[anomaly["kind"]](anomaly, self.data, self.idx).inject()
self.data += data
self.labels += labels
self.data = data
self.labels = labels
for anomaly in self.anomalies:
if anomaly["kind"] in PREPROCESS_ANOMALIES and step == "pre":
__inject()
elif anomaly["kind"] in POSTPROCESS_ANOMALIES and step == "post":
__inject()
return self.data
......@@ -8,11 +8,24 @@ from .base import BaseDimension
class CorrelationDimension(BaseDimension):
"""
This class defines the behavior of a basic dimension generator. The role of each dimension
generator is to generate data according to a given function or behavior such as oscillating
functions, correlation functions, etc.
This class defines a basic correlated dimension. It will follow an affine function and will go
up when the correlated dimension does go up and go down when the correlated dimension goes down.
"""
def generate(self) -> np.array:
"""
This function generates data according to the given configuration
"""
# Compute testing data
# Find the dimension to watch for
corr_dim = self.data[self.params["dimension"]]
# Compute values
steps = np.insert(self.terms["sign"] * np.sign(np.diff(corr_dim)) * self.terms["step"], 0, 0)
self.data[self.idx] = np.cumsum(steps)
# Compute training data
# Find the dimension to watch for
corr_dim = self.train_data[self.params["dimension"]]
# Compute values
steps = np.insert(self.terms["sign"] * np.sign(np.diff(corr_dim)) * self.terms["step"], 0, 0)
self.train_data[self.idx] = np.cumsum(steps)
return self.data, self.train_data
......@@ -27,16 +27,22 @@ class OscillatingDimension(BaseDimension):
"hacovercosine": lambda x: (1 + np.sin(x)) / 2,
}
def generate(self) -> Tuple[np.array, np.array, np.array]:
def generate(self) -> Tuple[np.array, np.array]:
"""
This function generates data according to the given configuration
"""
offset = np.pi * np.random.random_sample()
# Prepare computing variables
dt = .01
func = self.OSCILLATING_FUNCTIONS[self.terms["function"]]
# Generate testing data
steps = np.zeros(self.length)
steps += 2 * np.pi * self.terms["frequency"] * dt
self.data[self.idx] = self.terms["amplitude"] * func(np.cumsum(steps))
# Generate training data
steps = np.zeros(self.train_length)
steps += 2 * np.pi * self.terms["frequency"] * dt
self.data[self.idx] = self.terms["amplitude"] * func(np.cumsum(steps) + offset)
self.train_data[self.idx] = self.terms["amplitude"] * func(np.cumsum(steps))
return self.data, self.train_data, self.labels
return self.data, self.train_data
"""
This module is the entrypoint for the generation of time series.
"""
import json
import os
from argparse import ArgumentParser
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from generator import DatasetGenerator
plt.rcParams["figure.figsize"] = (20, 10)
if __name__ == "__main__":
# =================================================================================================================
# Fetch arguments from CLI
......@@ -20,23 +31,75 @@ if __name__ == "__main__":
choices=["generate", "train", "results", "all"]
)
parser.add_argument("-a", "--algorithm", help="Which algorithm to train.", default="kmeans")
parser.add_argument("-o", "--output", help="Output directory", default="output/")
parser.add_argument("-i", "--input", help="Input directory. Only to be used when no data will be generated")
parser.add_argument("-o", "--output", help="Output directory")
# Load args
args = parser.parse_args()
# Prepare output directory
if args.output is None:
OUTPUT_DIR = f"output/{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
else:
OUTPUT_DIR = args.output
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Prepare input directory if needed
if args.task not in ["generate", "all"] and args.input is None:
raise ValueError("Impossible to skip data generation and not give an input directory where data should be fetched")
elif args.task not in ["generate", "all"]:
INPUT_DIR = args.input
else:
INPUT_DIR = OUTPUT_DIR
# =================================================================================================================
# Generate Data
# =================================================================================================================
# Load config file
config = {}
for config_file in args.config:
# Compute config name
config_name = config_file.split("/")[-1][:-5]
# Data generation
data = DatasetGenerator(config)
if args.task in ["generate", "all"]:
# Create output dir
os.makedirs(f"{OUTPUT_DIR}/{config_name}")
# Read config file
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
# Data generation
dataset, train_dataset, labels = DatasetGenerator(config).generate()
# Save data to disk
# Prepare the data
columns = list(range(0, dataset.shape[0]))
df_test = pd.DataFrame(data=dataset.T, index=list(range(0, dataset.shape[1])), columns=columns)
df_test["is_anomaly"] = labels
df_train = pd.DataFrame(data=train_dataset.T, index=list(range(0, train_dataset.shape[1])), columns=columns)
df_train["is_anomaly"] = np.zeros(train_dataset.shape[1])
df_test.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset.csv", index_label="Timestamp")
df_train.to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_train.csv", index_label="Timestamp")
pd.DataFrame(data=labels).to_csv(f"{OUTPUT_DIR}/{config_name}/dataset_labels.csv", index=False)
# Plot data and save it to disk
for dimension in dataset:
plt.plot(dimension)
plt.savefig(f"{OUTPUT_DIR}/{config_name}/dataset.png")
plt.clf()
for dimension in train_dataset:
plt.plot(dimension)
plt.savefig(f"{OUTPUT_DIR}/{config_name}/train_dataset.png")
plt.clf()
# =================================================================================================================
# Train algorithm
# =================================================================================================================
if args.task in ["train", "all"]:
# =================================================================================================================
# Compute and plot results
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment