Source code for ice.anomaly_detection.models.base

from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
import optuna

from ice.base import BaseModel, SlidingWindowDataset
from ice.anomaly_detection.metrics import (
    accuracy, true_positive_rate, false_positive_rate)


[docs]class BaseAnomalyDetection(BaseModel, ABC): """Base class for all anomaly detection models.""" @abstractmethod def __init__( self, window_size: int, stride: int, batch_size: int, lr: float, num_epochs: int, device: str, verbose: bool, name: str, random_seed: int, val_ratio: float, save_checkpoints: bool, threshold_level: float = 0.95 ): """ Args: window_size (int): The window size to train the model. stride (int): The time interval between first points of consecutive sliding windows in training. batch_size (int): The batch size to train the model. lr (float): The learning rate to train the model. num_epochs (float): The number of epochs to train the model. device (str): The name of a device to train the model. `cpu` and `cuda` are possible. verbose (bool): If true, show the progress bar in training. name (str): The name of the model for artifact storing. random_seed (int): Seed for random number generation to ensure reproducible results. val_ratio (float): Proportion of the dataset used for validation, between 0 and 1. save_checkpoints (bool): If true, store checkpoints. threshold_level (float): Takes a value from 0 to 1. It specifies the quantile in the distribution of errors on the training dataset at which the threshold value is set. """ super().__init__(window_size, stride, batch_size, lr, num_epochs, device, verbose, name, random_seed, val_ratio, save_checkpoints) self.val_metrics = False self.threshold_level = threshold_level self.threshold_value = None
[docs] def fit(self, df: pd.DataFrame, target: pd.Series = None, epochs: int = None, save_path: str = None, trial: optuna.Trial = None, force_model_ctreation: bool = False): """Fit (train) the model by a given dataset. Args: df (pandas.DataFrame): A dataframe with sensor data. Index has two columns: `run_id` and `sample`. All other columns a value of sensors. target (pandas.Series): A series with target values. Indes has two columns: `run_id` and `sample`. It is omitted for anomaly detection task. epochs (int): The number of epochs for training step. If None, self.num_epochs parameter is used. save_path (str): Path to save checkpoints. If None, the path is created automatically. """ if trial: super().fit(df, target, epochs, save_path, trial=trial, force_model_ctreation=True) else: super().fit(df, target, epochs, save_path) error = [] for sample, target in tqdm( self.dataloader, desc='Steps ...', leave=False, disable=(not self.verbose) ): sample = sample.to(self.device) with torch.no_grad(): pred = self.model(sample) error.append(self.loss_no_reduction(pred, sample).mean(dim=(1, 2))) error = torch.concat(error) self.threshold_value = torch.quantile(error, self.threshold_level).item() if self.save_checkpoints: self.save_checkpoint(save_path)
_param_conf_map = dict(BaseModel._param_conf_map, **{ "threshold_level": ["MODEL", "THRESHOLD_LEVEL"] } )
[docs] def load_checkpoint(self, checkpoint_path: str): """Load checkpoint. Args: checkpoint_path (str): Path to load checkpoint. """ super().load_checkpoint(checkpoint_path) self.threshold_value = self._cfg['MODEL']['THRESHOLD_VALUE']
def _prepare_for_training(self, input_dim: int, output_dim: int): self.optimizer = Adam(self.model.parameters(), lr=self.lr) self.loss_fn = nn.L1Loss() self.loss_no_reduction = nn.L1Loss(reduction='none') def _predict(self, sample: torch.Tensor) -> torch.Tensor: input = sample.to(self.device) output = self.model(input) error = self.loss_no_reduction(output, input).mean(dim=(1, 2)) return (error > self.threshold_value).float().cpu() def _validate_inputs(self, df: pd.DataFrame, target: pd.Series): if target is not None: assert df.shape[0] == target.shape[0], f"target is incompatible with df by the length: {df.shape[0]} and {target.shape[0]}." assert np.all(df.index == target.index), "target's index and df's index are not the same." assert df.index.names == (['run_id', 'sample']), "An index should contain columns `run_id` and `sample`." assert len(df) >= self.window_size, "window size is larger than the length of df." def _calculate_metrics(self, pred: torch.tensor, target: torch.tensor) -> dict: metrics = { 'accuracy': accuracy(pred, target), 'true_positive_rate': true_positive_rate(pred, target), 'false_positive_rate': false_positive_rate(pred, target), } return metrics def _set_dims(self, df: pd.DataFrame, target: pd.Series): self.input_dim = df.shape[1] self.output_dim = 1