Source code for ice.anomaly_detection.models.gnn

import torch
from torch import nn
from torch.nn import functional as F
from pandas import DataFrame, Series

from ice.anomaly_detection.models.base import BaseAnomalyDetection


class GCLayer(nn.Module):
    def __init__(
            self,
            in_dim: int,
            out_dim: int
            ):
        super().__init__()
        self.dense = nn.Linear(in_dim, out_dim)
    
    def forward(self, adj, x):
        adj = adj + torch.eye(adj.size(0)).to(x.device)
        x = self.dense(x)
        norm = adj.sum(1)**(-1/2)
        x = norm[None, :] * adj * norm[:, None] @ x

        return x


class Directed_A(nn.Module):
    def __init__(
            self,
            num_sensors: int,
            window_size: int,
            alpha: float,
            k: int
            ):
        super().__init__()
        self.alpha = alpha
        self.k = k

        self.e1 = nn.Embedding(num_sensors, window_size)
        self.e2 = nn.Embedding(num_sensors, window_size)
        self.l1 = nn.Linear(window_size,window_size)
        self.l2 = nn.Linear(window_size,window_size)
    
    def forward(self, idx):
        m1 = torch.tanh(self.alpha*self.l1(self.e1(idx)))
        m2 = torch.tanh(self.alpha*self.l2(self.e2(idx)))
        adj = F.relu(torch.tanh(self.alpha*torch.mm(m1, m2.transpose(1,0))))
        
        if self.k:
            mask = torch.zeros(idx.size(0), idx.size(0)).to(idx.device)
            mask.fill_(float('0'))
            s1,t1 = (adj + torch.rand_like(adj)*0.01).topk(self.k,1)
            mask.scatter_(1,t1,s1.fill_(1))
            adj = adj*mask
        
        return adj


class GNNEncoder(nn.Module):
    def __init__(
            self,
            num_sensors: int,
            window_size: int,
            alpha: float,
            k: int
            ):
        super().__init__()
        self.idx = torch.arange(num_sensors)
        self.gcl1 = GCLayer(window_size, window_size // 2)
        self.gcl2 = GCLayer(window_size // 2, window_size // 8)
        self.A = Directed_A(num_sensors, window_size, alpha, k)
    
    def forward(self, x):
        x = torch.transpose(x, 1, 2)
        adj = self.A(self.idx.to(x.device))
        x = self.gcl1(adj, x).relu()
        x = self.gcl2(adj, x).relu()
        return x
    

class Decoder(nn.Module):
    def __init__(
            self,
            num_sensors: int,
            window_size: int
            ):
        super().__init__()
        self.num_sensors = num_sensors
        self.window_size = window_size
        self.decoder = nn.Sequential(
            nn.Linear(window_size // 8 * num_sensors, window_size // 2 * num_sensors),
            nn.ReLU(),
            nn.Linear(window_size // 2 * num_sensors, num_sensors * window_size)
        )
    
    def forward(self, x):
        x = torch.flatten(x,1)
        x = self.decoder(x)

        return x.view(-1, self.window_size, self.num_sensors)


[docs]class GSL_GNN(BaseAnomalyDetection): """ GNN autoencoder consists of encoder with graph convolutional layers and MLP decoder parts. The graph describing the data is constructed during the training process using trainable parameters. """ def __init__( self, window_size: int, stride: int = 1, batch_size: int = 128, lr: float = 0.001, num_epochs: int = 10, device: str = 'cpu', verbose: bool = False, name: str = 'gnn_anomaly_detection', random_seed: int = 42, val_ratio: float = 0.15, save_checkpoints: bool = False, threshold_level: float = 0.95, alpha: float = 0.2, k: int = None ): """ Args: window_size (int): The window size to train the model. stride (int): The time interval between first points of consecutive sliding windows in training. batch_size (int): The batch size to train the model. lr (float): The larning rate to train the model. num_epochs (float): The number of epochs to train the model. device (str): The name of a device to train the model. `cpu` and `cuda` are possible. verbose (bool): If true, show the progress bar in training. name (str): The name of the model for artifact storing. random_seed (int): Seed for random number generation to ensure reproducible results. val_ratio (float): Proportion of the dataset used for validation, between 0 and 1. save_checkpoints (bool): If true, store checkpoints. threshold_level (float): Takes a value from 0 to 1. It specifies the quantile in the distribution of errors on the training dataset at which the threshold value is set. alpha (float): Saturation rate for adjacency matrix. k (int): Limit on the number of edges in the adjacency matrix. """ super().__init__( window_size, stride, batch_size, lr, num_epochs, device, verbose, name, random_seed, val_ratio, save_checkpoints, threshold_level ) self.alpha = alpha self.k = k _param_conf_map = dict(BaseAnomalyDetection._param_conf_map, **{ "alpha": ["MODEL", "ALPHA"] } ) def _create_model(self, input_dim: int, output_dim: int): self.model = nn.Sequential( GNNEncoder( input_dim, self.window_size, self.alpha, self.k ), Decoder(input_dim, self.window_size) )