Source code for src.utils.generic_utils

import os
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import random
import warnings
import tensorflow as tf
import seaborn as sns
import pickle
from itertools import cycle
from scipy import interp
from sklearn.metrics import roc_curve, auc, confusion_matrix
from tensorflow.keras import backend as K
from src.utils.constants import TRAIN_FILES, TEST_FILES, NB_CLASSES_LIST


[docs]def plot_roc(y_test,y_score,figname="none",n_classes=3): """ Plots the ROC Curve given the target and the prediction probabilities Args: y_test: The target class labels y_score: The models output prediction probabilities figname: Name of the figure for saving. n_classes: Number of classes. """ sns.set_style('ticks') lw = 8.0 colors = cycle(['purple','crimson','lightpink',]) # Compute ROC curve and ROC area for each class fpr = dict() tpr = dict() roc_auc = dict() for i in range(n_classes): fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_score[:, i]) roc_auc[i] = auc(fpr[i], tpr[i]) # Compute micro-average ROC curve and ROC area fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_score.ravel()) roc_auc["micro"] = auc(fpr["micro"], tpr["micro"]) # First aggregate all false positive rates all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)])) # Then interpolate all ROC curves at this points mean_tpr = np.zeros_like(all_fpr) for i in range(n_classes): mean_tpr += interp(all_fpr, fpr[i], tpr[i]) # Finally average it and compute AUC mean_tpr /= n_classes fpr["macro"] = all_fpr tpr["macro"] = mean_tpr roc_auc["macro"] = auc(fpr["macro"], tpr["macro"]) # Plot all ROC curves plt.rcParams["axes.edgecolor"] = "0.15" plt.rcParams["axes.linewidth"] = 1.50 plt.figure(3,figsize=(5,5)) plt.plot(fpr["micro"], tpr["micro"],label='Micro-Average ROC \n (area = {0:0.2f})'.format(roc_auc["micro"]),color='crimson', linestyle='-', linewidth=2.5) plt.plot([0, 1], [0, 1], linestyle='--',color='purple', lw=2.0) plt.xlim([-0.1, 1.1]) plt.ylim([-0.1, 1.1]) plt.xticks(fontsize=14) plt.yticks(fontsize=14) plt.xlabel('False Positive Rate',fontsize=15) plt.ylabel('True Positive Rate',fontsize=15) plt.legend(fontsize=14,ncol=1) plt.show()
[docs]def load_dataset_at(index, normalize_timeseries=False, verbose=True,is_timeseries = True) -> (np.array, np.array): """ Loads a Univaraite Dataset indexed by `utils.constants`. The dataset is loaded as a pandas DataFrame and preprocessed to replace missing values with zero. .. note:: The dataset should be such that the first column corresponds to the target class label. i.e a dataset consisting of N time series of length T would be a dataframe of dimension Nx(T+1) where the first column corresponds to the class labels. Args: index: Integer index, set inside `utils.constants` that refers to the dataset. normalize_timeseries: Bool / Integer. Determines whether to normalize the timeseries. If False, does not normalize the time series. If True / int not equal to 2, performs standard sample-wise z-normalization. If 2: Performs full dataset z-normalization. verbose: Whether to describe the dataset being loaded. Returns: A tuple of shape (X_train, y_train, X_test, y_test, is_timeseries). For legacy reasons, is_timeseries is always True. """ assert index < len(TRAIN_FILES), "Index invalid. Could not load dataset at %d" % index if verbose: print("Loading train / test dataset : ", TRAIN_FILES[index], TEST_FILES[index]) if os.path.exists(TRAIN_FILES[index]): df = pd.read_csv(TRAIN_FILES[index], header=None, encoding='latin-1') elif os.path.exists(TRAIN_FILES[index][1:]): df = pd.read_csv(TRAIN_FILES[index][1:], header=None, encoding='latin-1') else: raise FileNotFoundError('File %s not found!' % (TRAIN_FILES[index])) # remove all columns which are completely empty df.dropna(axis=1, how='all', inplace=True) # fill all missing columns with 0 df.fillna(0, inplace=True) y_train = df[[0]].values nb_classes = NB_CLASSES_LIST[index] y_train = (y_train - y_train.min()) / (y_train.max() - y_train.min()) * (nb_classes - 1) # drop labels column from train set X df.drop(df.columns[0], axis=1, inplace=True) X_train = df.values if is_timeseries: X_train = X_train[:, np.newaxis, :] # normalize the values if normalize_timeseries: normalize_timeseries = int(normalize_timeseries) if normalize_timeseries == 2: X_train_mean = X_train.mean() X_train_std = X_train.std() X_train = (X_train - X_train_mean) / (X_train_std + 1e-8) elif normalize_timeseries == 1: X_train_mean = X_train.mean(axis=-1, keepdims=True) X_train_std = X_train.std(axis=-1, keepdims=True) X_train = (X_train - X_train_mean) / (X_train_std + 1e-8) else: X_train_mean = X_train.mean(axis=-1, keepdims=True) X_train = (X_train - X_train_mean) if verbose: print("Finished loading train dataset..") if os.path.exists(TEST_FILES[index]): df = pd.read_csv(TEST_FILES[index], header=None, encoding='latin-1') elif os.path.exists(TEST_FILES[index][1:]): df = pd.read_csv(TEST_FILES[index][1:], header=None, encoding='latin-1') else: raise FileNotFoundError('File %s not found!' % (TEST_FILES[index])) # remove all columns which are completely empty df.dropna(axis=1, how='all', inplace=True) # fill all missing columns with 0 df.fillna(0, inplace=True) y_test = df[[0]].values # extract labels Y and normalize to [0 - (MAX - 1)] range y_test = (y_test - y_test.min()) / (y_test.max() - y_test.min()) * (nb_classes - 1) # drop labels column from train set X df.drop(df.columns[0], axis=1, inplace=True) X_test = df.values if is_timeseries: X_test = X_test[:, np.newaxis, :] # normalize the values if normalize_timeseries: normalize_timeseries = int(normalize_timeseries) if normalize_timeseries == 2: X_test = (X_test - X_train_mean) / (X_train_std + 1e-8) elif normalize_timeseries ==1 : X_test_mean = X_test.mean(axis=-1, keepdims=True) X_test_std = X_test.std(axis=-1, keepdims=True) X_test = (X_test - X_test_mean) / (X_test_std + 1e-8) else: X_test_mean = X_test.mean(axis=-1, keepdims=True) X_test = (X_test - X_test_mean) if verbose: print("Finished loading test dataset..") print() print("Number of train samples : ", X_train.shape[0], "Number of test samples : ", X_test.shape[0]) print("Sequence length : ", X_train.shape[-1]) return X_train, y_train, X_test, y_test, is_timeseries