Source code for ice.base

from abc import ABC, abstractmethod
from typing import Any
import pandas as pd
import numpy as np
import torch
from tqdm.auto import tqdm, trange
import os
import zipfile
import requests
import datetime
import json
import random
from ice.configs import Config
import time
from torch.utils.data import DataLoader, Dataset, random_split
import optuna


[docs]class BaseDataset(ABC): """Base class for datasets.""" def __init__(self, num_chunks=None, force_download=False): """ Args: num_chunks (int): If given, download only num_chunks chunks of data. Used for testing purposes. force_download (bool): If True, download the dataset even if it exists. """ self.df = None self.target = None self.train_mask = None self.test_mask = None self.name = None self.public_link = None self.set_name_public_link() self._load(num_chunks, force_download) def _load(self, num_chunks, force_download): """Load the dataset in self.df and self.target.""" ref_path = f"data/{self.name}/" if not os.path.exists(ref_path): os.makedirs(ref_path) zfile_path = f"data/{self.name}.zip" url = self._get_url(self.public_link) if not os.path.exists(zfile_path) or force_download: self._download_pgbar(url, zfile_path, self.name, num_chunks) self._extracting_files(zfile_path, ref_path) self.df = self._read_csv_pgbar( ref_path + "df.csv", index_col=["run_id", "sample"] ) self.target = self._read_csv_pgbar( ref_path + "target.csv", index_col=["run_id", "sample"] )["target"] self.train_mask = self._read_csv_pgbar( ref_path + "train_mask.csv", index_col=["run_id", "sample"] )["train_mask"] self.train_mask = self.train_mask.astype(bool) self.test_mask = ~self.train_mask def _get_url(self, public_link): url = "" r = requests.get( f"https://cloud-api.yandex.net/v1/disk/public/resources?public_key={public_link}" ) if r.status_code == 200: url = r.json()["file"] else: raise Exception(r.json()["description"]) return url def _read_csv_pgbar(self, csv_path, index_col, chunk_size=1024 * 100): rows = sum(1 for _ in open(csv_path, "r")) - 1 chunk_list = [] with tqdm(total=rows, desc=f"Reading {csv_path}") as pbar: for chunk in pd.read_csv( csv_path, index_col=index_col, chunksize=chunk_size ): chunk_list.append(chunk) pbar.update(len(chunk)) df = pd.concat((f for f in chunk_list), axis=0) return df def _download_pgbar(self, url, zfile_path, fname, num_chunks): resp = requests.get(url, stream=True) total = int(resp.headers.get("Content-Length")) with open(zfile_path, "wb") as file: with tqdm( total=total, desc=f"Downloading {fname}", unit="B", unit_scale=True, unit_divisor=1024, ) as pbar: i = 0 for data in resp.iter_content(chunk_size=1024): if num_chunks is not None and num_chunks == i: break file.write(data) pbar.update(len(data)) i += 1 def _extracting_files(self, zfile_path, ref_path, block_size=1024 * 10000): with zipfile.ZipFile(zfile_path, "r") as zfile: for entry_info in zfile.infolist(): if os.path.exists(ref_path + entry_info.filename): continue input_file = zfile.open(entry_info.filename) target_file = open(ref_path + entry_info.filename, "wb") block = input_file.read(block_size) with tqdm( total=entry_info.file_size, desc=f"Extracting {entry_info.filename}", unit="B", unit_scale=True, unit_divisor=1024, ) as pbar: while block: target_file.write(block) block = input_file.read(block_size) pbar.update(block_size) input_file.close() target_file.close()
[docs]class BaseModel(ABC): """Base class for all models.""" _param_conf_map = { "batch_size": ["DATASET", "BATCH_SIZE"], "lr": ["OPTIMIZATION", "LR"], "num_epochs": ["OPTIMIZATION", "NUM_EPOCHS"], "verbose": ["VERBOSE"], "device": ["DEVICE"], "name": ["EXPERIMENT_NAME"], "window_size": ["MODEL", "WINDOW_SIZE"], "stride": ["MODEL", "STRIDE"], "val_ratio": ["DATASET", "VAL_RATIO"], "random_seed": ["SEED"], } @abstractmethod def __init__( self, window_size: int, stride: int, batch_size: int, lr: float, num_epochs: int, device: str, verbose: bool, name: str, random_seed: int, val_ratio: float, save_checkpoints: bool, ): """ Args: window_size (int): The window size to train the model. stride (int): The time interval between first points of consecutive sliding windows in training. batch_size (int): The batch size to train the model. lr (float): The larning rate to train the model. num_epochs (float): The number of epochs to train the model. device (str): The name of a device to train the model. `cpu` and `cuda` are possible. verbose (bool): If true, show the progress bar in training. name (str): The name of the model for artifact storing. random_seed (int): Seed for random number generation to ensure reproducible results. val_ratio (float): Proportion of the dataset used for validation, between 0 and 1. save_checkpoints (bool): If true, store checkpoints. """ self._cfg = Config() self._cfg.path_set(["MODEL", "NAME"], self.__class__.__name__) self._output_dir = "outputs" self.window_size = window_size self.val_ratio = val_ratio self.val_metrics = None self.stride = stride self.batch_size = batch_size self.lr = lr self.num_epochs = num_epochs self.device = device self.verbose = verbose self.model = None self.name = name self.random_seed = random_seed self.save_checkpoints = save_checkpoints self.input_dim = None self.output_dim = None self.train_time = "no date" self.checkpoint_epoch = 0 self.direction = "minimize" def __setattr__(self, __name: str, __value: Any): if ( __name not in ["cfg", "param_conf_map"] and __name in self._param_conf_map.keys() ): self._cfg.path_set(self._param_conf_map[__name], __value) super().__setattr__(__name, __value) if __name == "name": self._training_path, self._inference_path, self._checkpoints_path = ( self._initialize_paths() )
[docs] @classmethod def from_config(cls, cfg: Config): """Create instance of the model class with parameters from config. Args: cfg (Config): A config with model's parameters. Returns: BaseModel: Instance of BaseModel child class initialized with parameters from config. """ param_dict = {} for key in cls._param_conf_map.keys(): param_dict[key] = cfg.path_get(cls._param_conf_map[key]) return cls(**param_dict)
[docs] def fit( self, df: pd.DataFrame, target: pd.Series = None, epochs: int = None, save_path: str = None, trial: optuna.Trial = None, force_model_ctreation: bool = False, ): """Fit (train) the model by a given dataset. Args: df (pandas.DataFrame): A dataframe with sensor data. Index has two columns: `run_id` and `sample`. All other columns a value of sensors. target (pandas.Series): A series with target values. Index has two columns: `run_id` and `sample`. It is omitted for anomaly detection task. epochs (int): The number of epochs for training step. If None, self.num_epochs parameter is used. save_path (str): Path to save checkpoints. If None, the path is created automatically. trial (optuna.Trial, None): optuna.Trial object created by optimize method. force_model_ctreation (bool): force fit to create model for optimization study. """ self.train_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") if epochs is None: epochs = self.num_epochs self._validate_inputs(df, target) if self.model is None or force_model_ctreation: self._set_dims(df, target) self._create_model(self.input_dim, self.output_dim) assert self.model is not None, "Model creation error." self._prepare_for_training(self.input_dim, self.output_dim) self._train_nn( df=df, target=target, epochs=epochs, save_path=save_path, trial=trial ) self._store_atrifacts_train()
[docs] @torch.no_grad() def predict(self, sample: torch.Tensor) -> torch.Tensor: """Make a prediction for a given batch of samples. Args: sample (torch.Tensor): A tensor of the shape (B, L, C) where B is the batch size, L is the sequence length, C is the number of sensors. Returns: torch.Tensor: A tensor with predictions of the shape (B,). """ self.model.eval() self.model.to(self.device) return self._predict(sample)
[docs] def optimize( self, df: pd.DataFrame, target: pd.Series = None, optimize_parameter: str = "batch_size", optimize_range: tuple = (128, 256), direction: str = "minimize", n_trials: int = 5, epochs: int = None, optimize_metric: str = None, ): """Make the optuna study to return the best hyperparameter value on validation dataset Args: df (pd.DataFrame): DataFrame to use method fit optimize_parameter (str, optional): Model parameter to optimize. Defaults to 'batch_size'. optimize_range (tuple, optional): Model parameter range for optuna trials. Defaults to (128, 256). n_trials (int, optional): number of trials. Defaults to 5. target (pd.Series, optional): target pd.Series to use method fit. Defaults to None. epochs (int, optional): Epoch number to use method fit. Defaults to None. optimize_metric (str): Metric on validation dataset to use as a target for hyperparameter optimization. direction (str): "minimize" or "maximize" the target for hyperparameter optimization """ param_type = type(self.__dict__[optimize_parameter]) self.direction = direction defaults_torch_backends = ( torch.backends.cudnn.deterministic, torch.backends.cudnn.benchmark, ) self.dump = self._training_path # make torch deterministic behavior torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.use_deterministic_algorithms(True) os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8" if not epochs: epochs = self.num_epochs def objective(trial): self._training_path = self.dump """optuna objective Args: trial (optuna.Trial): optuna trial object Raises: AssertionError: Returns if optimize_parameter value is not numerical Returns: float: best validation loss to perform optimization step """ if param_type == float: suggested_param = trial.suggest_float( optimize_parameter, optimize_range[0], optimize_range[1] ) elif param_type == int: suggested_param = trial.suggest_int( optimize_parameter, optimize_range[0], optimize_range[1] ) elif optimize_parameter == "lr": suggested_param = trial.suggest_loguniform( optimize_parameter, optimize_range[0], optimize_range[1] ) else: raise AssertionError(f"{optimize_parameter} is not int or float value") setattr(self, optimize_parameter, suggested_param) if optimize_metric: self.val_metrics = True self._training_path = ( self._training_path + f"/parameter_{optimize_parameter} optimization" ) os.makedirs(self._training_path, exist_ok=True) self.checkpoint_epoch = 0 print(f"trial step with {optimize_parameter} = {suggested_param}") self.fit( df=df, epochs=epochs, target=target, trial=trial, force_model_ctreation=True, ) # use the best key metric on validation dataset from training # if there is no such metric, use the best validation loss if optimize_metric: return self.best_validation_metrics[ optimize_metric ] # dict with all best metric achieved during training -> write it else: return self.best_val_loss study = optuna.create_study( direction=direction, pruner=optuna.pruners.PercentilePruner(25.0), study_name=f"/parameter_{optimize_parameter} study", ) study.optimize(objective, n_trials=n_trials) self._training_path = self.dump print(f"Best hyperparameters: {study.best_params}") print(f"Best trial: {study.best_trial}") df_trials = study.trials_dataframe() df_trials.to_csv( self._training_path + f"/parameter_{optimize_parameter} optimization" + f"/parameter_{optimize_parameter}.csv", index=False, ) # restore standard torch deterministic values torch.backends.cudnn.deterministic, torch.backends.cudnn.benchmark = ( defaults_torch_backends ) del os.environ["CUBLAS_WORKSPACE_CONFIG"] torch.use_deterministic_algorithms(False)
[docs] @torch.no_grad() def evaluate(self, df: pd.DataFrame, target: pd.Series) -> dict: """Evaluate the metrics: accuracy. Args: df (pandas.DataFrame): A dataframe with sensor data. Index has two columns: `run_id` and `sample`. All other columns a value of sensors. target (pandas.Series): A series with target values. Indes has two columns: `run_id` and `sample`. Returns: dict: A dictionary with metrics where keys are names of metrics and values are values of metrics. """ self._validate_inputs(df, target) dataset = SlidingWindowDataset(df, target, window_size=self.window_size) self.dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=False) target, pred = [], [] for sample, _target in tqdm( self.dataloader, desc="Steps ...", leave=False, disable=(not self.verbose) ): sample = sample.to(self.device) target.append(_target) pred.append(self.predict(sample)) target = torch.concat(target).numpy() pred = torch.concat(pred).numpy() metrics = self._calculate_metrics(pred, target) self._store_atrifacts_inference(metrics) return metrics
[docs] @torch.no_grad() def model_param_estimation(self): """Calculate number of self.model parameters, mean and std for inference time Returns: tuple: A tuple containing the number of parameters in the model and the mean and standard deviation of model inference time. """ assert ( self.model != None ), "use model.fit() to create fitted model object before" sample = iter(self.dataloader).__next__() if len(sample) == 2: x, y = sample else: x = sample dummy_input = torch.randn(1, x.shape[1], x.shape[2]).to(self.device) repetitions = 500 times = np.zeros((repetitions, 1)) if self.device == "cuda": starter, ender = torch.cuda.Event(enable_timing=True), torch.cuda.Event( enable_timing=True ) for i in range(repetitions): starter.record() _ = self.model(dummy_input) ender.record() torch.cuda.synchronize() curr_time = starter.elapsed_time(ender) times[i] = curr_time else: for i in range(repetitions): start_time = time.time() _ = self.model(dummy_input) end_time = time.time() curr_time = (end_time - start_time) * 1000 # Convert to milliseconds times[i] = curr_time mean_inference_time = np.sum(times) / repetitions std_inference_time = np.std(times) num_params = sum(p.numel() for p in self.model.parameters()) return num_params, (mean_inference_time, std_inference_time)
@abstractmethod def _create_model(self, input_dim: int, output_dim: int): """ This method has to be implemented by all children. Create a torch model for traing and prediction. """ pass @abstractmethod def _prepare_for_training(self, input_dim: int, output_dim: int): """ This method has to be implemented by all children. Prepare the model for training by a given dataset. """ pass @abstractmethod def _calculate_metrics(self, pred: torch.tensor, target: torch.tensor) -> dict: """ This method has to be implemented by all children. Calculate metrics. """ pass @abstractmethod def _predict(self, sample: torch.Tensor) -> torch.Tensor: """ This method has to be implemented by all children. Make a prediction for a given batch of samples. """ pass @abstractmethod def _set_dims(self, df: pd.DataFrame, target: pd.Series): """ This method has to be implemented by all children. Calculate input and output dimensions of the model by the given dataset. """ pass def _validate_inputs(self, df: pd.DataFrame, target: pd.Series): assert ( df.shape[0] == target.shape[0] ), f"target is incompatible with df by the length: {df.shape[0]} and {target.shape[0]}." assert np.all( df.index == target.index ), "target's index and df's index are not the same." assert df.index.names == ( ["run_id", "sample"] ), "An index should contain columns `run_id` and `sample`." assert ( len(df) >= self.window_size ), "window size is larger than the length of df." assert len(df) >= self.stride, "stride is larger than the length of df." def _train_nn( self, df: pd.DataFrame, target: pd.Series, epochs: int, save_path: str, trial: optuna.Trial, ): self.model.train() self.model.to(self.device) self.best_val_loss = ( float("inf") if self.direction == "minimize" else float("-inf") ) self.best_validation_metrics = {} self._set_seed() dataset = SlidingWindowDataset(df, target, window_size=self.window_size, stride=self.stride) val_size = max(int(len(dataset) * self.val_ratio), 1) train_size = len(dataset) - val_size train_dataset, val_dataset = random_split( dataset, [train_size, val_size], generator=torch.Generator().manual_seed(self.random_seed), ) self.dataloader = DataLoader( train_dataset, batch_size=self.batch_size, shuffle=True ) self.val_dataloader = DataLoader( val_dataset, batch_size=self.batch_size, shuffle=False ) for e in trange( self.checkpoint_epoch, self.checkpoint_epoch + epochs, desc="Epochs ...", disable=(not self.verbose), ): for sample, target in tqdm( self.dataloader, desc="Steps ...", leave=False, disable=(not self.verbose), ): sample = sample.to(self.device) target = target.to(self.device) logits = self.model(sample) loss = self.loss_fn(logits, target) self.optimizer.zero_grad() loss.backward() self.optimizer.step() if self.verbose: print(f"Epoch {e+1}, Loss: {loss.item():.4f}") self.checkpoint_epoch = e + 1 if self.save_checkpoints: self.save_checkpoint(save_path) val_loss, val_metrics = self._validate_nn() if self.direction == "minimize": self.best_val_loss = ( val_loss if val_loss < self.best_val_loss else self.best_val_loss ) else: self.best_val_loss = ( val_loss if val_loss > self.best_val_loss else self.best_val_loss ) if self.val_metrics: for key, value in val_metrics.items(): if key not in self.best_validation_metrics: self.best_validation_metrics[key] = value else: if self.direction == "minimize": self.best_validation_metrics[key] = ( value if self.best_validation_metrics[key] > value else self.best_validation_metrics[key] ) else: self.best_validation_metrics[key] = ( value if self.best_validation_metrics[key] < value else self.best_validation_metrics[key] ) if self.verbose: if self.val_metrics: print( f"Epoch {e+1}, Validation Loss: {val_loss:.4f}, Metrics: {val_metrics}" ) else: print(f"Epoch {e+1}, Validation Loss: {val_loss:.4f}") if trial: trial.report(val_loss, self.checkpoint_epoch) if trial.should_prune(): raise optuna.exceptions.TrialPruned() def _validate_nn(self): self.model.eval() val_loss = 0 target_list, pred_list = [], [] with torch.no_grad(): for sample, target in self.val_dataloader: sample = sample.to(self.device) target = target.to(self.device) logits = self.model(sample) loss = self.loss_fn(logits, target) val_loss += loss.item() if self.val_metrics: pred = self.predict(sample) target_list.append(target.cpu()) pred_list.append(pred.cpu()) val_loss /= len(self.val_dataloader) if self.val_metrics: target_tensor = torch.cat(target_list).numpy() pred_tensor = torch.cat(pred_list).numpy() val_metrics = self._calculate_metrics(pred_tensor, target_tensor) else: val_metrics = None self.model.train() return val_loss, val_metrics def _set_seed(self): torch.manual_seed(self.random_seed) random.seed(self.random_seed) np.random.seed(self.random_seed) def _initialize_paths(self): artifacts_path = os.path.join(self._output_dir, self.name) training_path = os.path.join(artifacts_path, "training") inference_path = os.path.join(artifacts_path, "inference") checkpoints_path = os.path.join(artifacts_path, "checkpoints") return training_path, inference_path, checkpoints_path def _store_atrifacts_train(self): save_path = os.path.join(self._training_path, self.train_time) os.makedirs(save_path, exist_ok=True) self._cfg.to_yaml(os.path.join(save_path, "config.yaml")) def _store_atrifacts_inference(self, metrics): save_path = os.path.join(self._inference_path, self.train_time) os.makedirs(save_path, exist_ok=True) self._cfg.to_yaml(os.path.join(save_path, "config.yaml")) with open(os.path.join(save_path, "metrics.json"), "w") as json_file: json.dump(metrics, json_file)
[docs] def save_checkpoint(self, save_path: str = None): """Save checkpoint. Args: save_path (str): Path to save checkpoint. """ if save_path is None: checkpoints_path = os.path.join(self._checkpoints_path, self.train_time) os.makedirs(checkpoints_path, exist_ok=True) file_path = self.name + "_epoch_" + str(self.checkpoint_epoch) save_path = os.path.join(checkpoints_path, file_path + ".tar") torch.save( { "config": self._cfg, "epoch": self.checkpoint_epoch, "input_dim": self.input_dim, "output_dim": self.output_dim, "model_state_dict": self.model.state_dict(), "optimizer_state_dict": self.optimizer.state_dict(), }, save_path, )
[docs] def load_checkpoint(self, checkpoint_path: str): """Load checkpoint. Args: checkpoint_path (str): Path to load checkpoint. """ checkpoint = torch.load(checkpoint_path, map_location=self.device) self._cfg = checkpoint["config"] self.input_dim = checkpoint["input_dim"] self.output_dim = checkpoint["output_dim"] self._create_model(self.input_dim, self.output_dim) self.model.to(self.device) assert self.model is not None, "Model creation error." self._prepare_for_training(self.input_dim, self.output_dim) self.model.load_state_dict(checkpoint["model_state_dict"]) self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"]) self.checkpoint_epoch = checkpoint["epoch"]
class SlidingWindowDataset(Dataset): def __init__( self, df: pd.DataFrame, target: pd.Series, window_size: int, stride: int = 1 ): self.df = df self.target = target self.window_size = window_size window_end_indices = [] run_ids = df.index.get_level_values(0).unique() for run_id in tqdm(run_ids, desc="Creating sequence of samples"): indices = np.array(df.index.get_locs([run_id])) indices = indices[self.window_size :: stride] window_end_indices.extend(indices) self.window_end_indices = np.array(window_end_indices) def __len__(self): return len(self.window_end_indices) def __getitem__(self, idx): window_index = self.window_end_indices[idx] sample = self.df.values[window_index - self.window_size : window_index] if self.target is not None: target = self.target.values[window_index] else: target = sample.astype("float32") return sample.astype("float32"), target