Source code for mastml.error_analysis

"""
This module contains classes for quantifying the predicted model errors (uncertainty quantification), and preparing
provided residual (true errors) predicted model error data for plotting (e.g. residual vs. error plots), or for
recalibration of model errors using the method of Palmer et al.

ErrorUtils:
    Collection of functions to conduct error analysis on certain types of models (uncertainty quantification), and prepare
    residual and model error data for plotting, as well as recalibrate model errors with various methods

CorrectionFactors
    Class for performing recalibration of model errors (uncertainty quantification) based on the method from the
    work of Palmer et al.

"""

import os
import sys
import numpy as np
import pandas as pd
from scipy.optimize import minimize
try:
    from forestci import random_forest_error
except:
    print('forestci is an optional dependency. To install latest forestci compatabilty with scikit-learn>=0.24, run '
          'pip install git+git://github.com/scikit-learn-contrib/forest-confidence-interval.git')

[docs]class ErrorUtils(): ''' Collection of functions to conduct error analysis on certain types of models (uncertainty quantification), and prepare residual and model error data for plotting, as well as recalibrate model errors with various methods Args: None Methods: _collect_error_data: method to collect all residuals, model errors, and dataset standard deviation over many data splits Args: savepath: (str), string denoting the path to save output to data_type: (str), string denoting the data type analyzed, e.g. train, test, leftout Returns: model_errors: (pd.Series), series containing the predicted model errors residuals: (pd.Series), series containing the true model errors (residuals) dataset_stdev: (float), standard deviation of the data set _recalibrate_errors: method to recalibrate the model errors using negative log likelihood function from work of Palmer et al. Args: model_errors: (pd.Series), series containing the predicted (uncalibrated) model errors residuals: (pd.Series), series containing the true model errors (residuals) Returns: model_errors: (pd.Series), series containing the predicted (calibrated) model errors a: (float), the slope of the recalibration linear fit b: (float), the intercept of the recalibration linear fit _parse_error_data: method to prepare the provided residuals and model errors for plotting the binned RvE (residual vs error) plots Args: model_errors: (pd.Series), series containing the predicted model errors residuals: (pd.Series), series containing the true model errors (residuals) dataset_stdev: (float), standard deviation of the data set number_of_bins: (int), the number of bins to digitize the data into for making the RvE (residual vs. error) plot Returns: bin_values: (np.array), the x-axis of the RvE plot: reduced model error values digitized into bins rms_residual_values: (np.array), the y-axis of the RvE plot: the RMS of the residual values digitized into bins num_values_per_bin: (np.array), the number of data samples in each bin number_of_bins: (int), the number of bins to put the model error and residual data into. _get_model_errors: method for generating the model error values using either the standard deviation of weak learners or jackknife-after-bootstrap method of Wager et al. Args: model: (mastml.models object), a MAST-ML model, e.g. SklearnModel or EnsembleModel X: (pd.DataFrame), dataframe of the X feature matrix X_train: (pd.DataFrame), dataframe of the X training data feature matrix X_test: (pd.DataFrame), dataframe of the X test data feature matrix error_method: (str), string denoting the UQ error method to use. Viable options are 'stdev_weak_learners' and 'jackknife_after_bootstrap' remove_outlier_learners: (bool), whether specific weak learners that are found to deviate from 3 sigma of the average prediction for a given data point are removed (Default False) Returns: model_errors: (pd.Series), series containing the predicted model errors num_removed_learners: (list), list of number of removed weak learners for each data point _remove_outlier_preds: method to flag and remove outlier weak learner predictions Args: preds: (list), list of predicted values of a given data point from an ensemble of weak learners Returns: preds_cleaned: (list), ammended list of predicted values of a given data point from an ensemble of weak learners, with predictions from outlier learners removed num_outliers: (int), the number of removed weak learners for the data point evaluated ''' @classmethod def _collect_error_data(cls, savepath, data_type): if data_type not in ['train', 'test', 'leaveout']: print('Error: data_test_type must be one of "train", "test" or "leaveout"') exit() dfs_error = list() dfs_residuals = list() dfs_ytrue = list() residuals_files_to_parse = list() error_files_to_parse = list() ytrue_files_to_parse = list() splits = list() for folder, subfolders, files in os.walk(savepath): if 'split' in folder: splits.append(folder) for path in splits: if os.path.exists(os.path.join(path, 'model_errors_'+str(data_type)+'.xlsx')): error_files_to_parse.append(os.path.join(path, 'model_errors_' + str(data_type) + '.xlsx')) if os.path.exists(os.path.join(path, 'residuals_' + str(data_type) + '.xlsx')): residuals_files_to_parse.append(os.path.join(path, 'residuals_' + str(data_type) + '.xlsx')) if os.path.exists(os.path.join(path, 'y_train.xlsx')): ytrue_files_to_parse.append(os.path.join(path, 'y_train.xlsx')) for file in residuals_files_to_parse: df = pd.read_excel(file) dfs_residuals.append(np.array(df['residuals'])) for file in error_files_to_parse: df = pd.read_excel(file) dfs_error.append(np.array(df['model_errors'])) for file in ytrue_files_to_parse: df = pd.read_excel(file) dfs_ytrue.append(np.array(df['y_train'])) ytrue_all = np.concatenate(dfs_ytrue).ravel() dataset_stdev = np.std(np.unique(ytrue_all)) model_errors = np.concatenate(dfs_error).ravel().tolist() residuals = np.concatenate(dfs_residuals).ravel().tolist() return model_errors, residuals, dataset_stdev @classmethod def _recalibrate_errors(cls, model_errors, residuals): corrector = CorrectionFactors(residuals=residuals, model_errors=model_errors) a, b = corrector.nll() # shift the model errors by the correction factor model_errors = pd.Series(a * np.array(model_errors) + b, name='model_errors') return model_errors, a, b @classmethod def _parse_error_data(cls, model_errors, residuals, dataset_stdev, number_of_bins=15): # Normalize the residuals and model errors by dataset stdev model_errors = model_errors/dataset_stdev residuals = residuals/dataset_stdev abs_res = abs(residuals) # check to see if number of bins should increase, and increase it if so model_errors_sorted = np.sort(model_errors) ninety_percentile = int(len(model_errors_sorted) * 0.9) ninety_percentile_range = model_errors_sorted[ninety_percentile] - np.amin(model_errors) total_range = np.amax(model_errors) - np.amin(model_errors) number_of_bins = number_of_bins if ninety_percentile_range / total_range < 5 / number_of_bins: number_of_bins = int(5 * total_range / ninety_percentile_range) # Set bins for calculating RMS upperbound = np.amax(model_errors) lowerbound = np.amin(model_errors) bins = np.linspace(lowerbound, upperbound, number_of_bins, endpoint=False) # Create a vector determining bin of each data point digitized = np.digitize(model_errors, bins) # Record which bins contain data (to avoid trying to do calculations on empty bins) bins_present = [] for i in range(1, number_of_bins + 1): if i in digitized: bins_present.append(i) # Create array of weights based on counts in each bin weights = [] for i in range(1, number_of_bins + 1): if i in digitized: weights.append(np.count_nonzero(digitized == i)) # Calculate RMS of the absolute residuals RMS_abs_res = [np.sqrt((abs_res[digitized == bins_present[i]] ** 2).mean()) for i in range(0, len(bins_present))] MS_abs_res = [(abs_res[digitized == bins_present[i]] ** 2).mean() for i in range(0, len(bins_present))] Var_squarederr_res = [np.std((abs_res[digitized == bins_present[i]] ** 2)) ** 2 for i in range(0, len(bins_present))] # Set the x-values to the midpoint of each bin bin_width = bins[1] - bins[0] binned_model_errors = np.zeros(len(bins_present)) for i in range(0, len(bins_present)): curr_bin = bins_present[i] binned_model_errors[i] = bins[curr_bin - 1] + bin_width / 2 #TODO this is temporary bin_values = np.array(binned_model_errors) rms_residual_values = np.array(RMS_abs_res) ms_residual_values = np.array(MS_abs_res) var_sq_residual_values = np.array(Var_squarederr_res) num_values_per_bin = np.array(weights) return bin_values, rms_residual_values, num_values_per_bin, number_of_bins, ms_residual_values, var_sq_residual_values @classmethod def _get_model_errors(cls, model, X, X_train, X_test, error_method='stdev_weak_learners', remove_outlier_learners=False): err_down = list() err_up = list() indices_TF = list() X_aslist = X.values.tolist() if model.model.__class__.__name__ in ['RandomForestRegressor', 'GradientBoostingRegressor', 'ExtraTreesRegressor', 'BaggingRegressor', 'AdaBoostRegressor']: if error_method == 'jackknife_after_bootstrap': model_errors_var = random_forest_error(forest=model.model, X_test=X_test, X_train=X_train) # Wager method returns the variance. Take sqrt to turn into stdev model_errors = np.sqrt(model_errors_var) num_removed_learners = list() if remove_outlier_learners is True: print("Warning: removal of outlier learners isn't supported with jackknife after bootstrap") for _ in model_errors: num_removed_learners.append(0) elif error_method == 'stdev_weak_learners': num_removed_learners = list() for x in range(len(X_aslist)): preds = list() if model.model.__class__.__name__ == 'RandomForestRegressor': for pred in model.model.estimators_: preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))[0]) elif model.model.__class__.__name__ == 'BaggingRegressor': for pred in model.model.estimators_: if pred.__class__.__name__ == 'KerasRegressor': preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))) elif pred.__class__.__name__ == 'Sequential': preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))) else: preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))[0]) elif model.model.__class__.__name__ == 'ExtraTreesRegressor': for pred in model.model.estimators_: preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))[0]) elif model.model.__class__.__name__ == 'GradientBoostingRegressor': for pred in model.model.estimators_.tolist(): preds.append(pred[0].predict(np.array(X_aslist[x]).reshape(1, -1))[0]) elif model.model.__class__.__name__ == 'AdaBoostRegressor': for pred in model.model.estimators_: preds.append(pred.predict(np.array(X_aslist[x]).reshape(1, -1))[0]) # HERE flag outlier predictions, perhaps result of e.g. numerical issues in ensemble of models if remove_outlier_learners == True: preds, num_outliers = cls._remove_outlier_preds(preds=preds) num_removed_learners.append(num_outliers) else: num_removed_learners.append(0) e_down = np.std(preds) e_up = np.std(preds) err_down.append(e_down) err_up.append(e_up) nan_indices = np.where(np.isnan(err_up)) nan_indices_sorted = np.array(sorted(nan_indices[0], reverse=True)) for i, val in enumerate(list(err_up)): if i in nan_indices_sorted: indices_TF.append(False) else: indices_TF.append(True) model_errors = (np.array(err_up) + np.array(err_down)) / 2 else: print('ERROR: error_method must be one of "stdev_weak_learners" or "jackknife_after_bootstrap"') sys.exit() if model.model.__class__.__name__ == 'GaussianProcessRegressor': preds = model.model.predict(X, return_std=True)[1] # Get the stdev model error from the predictions of GPR err_up = preds err_down = preds model_errors = (np.array(err_up) + np.array(err_down)) / 2 nan_indices = np.where(np.isnan(err_up)) nan_indices_sorted = np.array(sorted(nan_indices[0], reverse=True)) num_removed_learners = list() for i, val in enumerate(list(err_up)): num_removed_learners.append(0) if i in nan_indices_sorted: indices_TF.append(False) else: indices_TF.append(True) model_errors = pd.Series(model_errors, name='model_errors') num_removed_learners = pd.Series(num_removed_learners, name='num_removed_learners') return model_errors, num_removed_learners @classmethod def _remove_outlier_preds(cls, preds): # set the outlier flag to be number of standard deviations, obtained by dividing 2*number of predictors by 100. n_stdev = 3 mean_pred = np.mean(preds) stdev_pred = np.std(preds) preds_cleaned = list() num_outliers = 0 for pred in preds: if pred < mean_pred-n_stdev*stdev_pred: num_outliers += 1 elif pred > mean_pred+n_stdev*stdev_pred: num_outliers += 1 else: preds_cleaned.append(pred) #print('num outliers', num_outliers) #print('Found number of outlier predictions in ensemble error analysis:', num_outliers, 'which reduces number of ensemble predictions used in error analysis from ', len(preds), 'to ', len(preds_cleaned)) return np.array(preds_cleaned), num_outliers
[docs]class CorrectionFactors(): ''' Class for performing recalibration of model errors (uncertainty quantification) based on the method from the work of Palmer et al. Args: residuals: (pd.Series), series containing the true model errors (residuals) model_errors: (pd.Series), series containing the predicted model errors Methods: nll: Method to perform optimization of normalized model error distribution using the negative log likelihood function Args: None Returns: a: (float), the slope of the recalibration linear fit b: (float), the intercept of the recalibration linear fit _nll_opt: Method for evaluating the negative log likelihood function for set of residuals and model errors Args: x: (np.array), array providing the initial guess of (a, b) Returns: _ : (float), the recalibrated error value ''' def __init__(self, residuals, model_errors): self.residuals = residuals self.model_errors = model_errors
[docs] def nll(self): x0 = np.array([1.0, 0.0]) res = minimize(self._nll_opt, x0, method='nelder-mead') a = res.x[0] b = res.x[1] success = res.success if success is True: pass elif success is False: print("Warning: NLL optimization failed!") # print(res) #r_squared = self._direct_rsquared(a, b) return a, b
def _nll_opt(self, x): sum = 0 for i in range(0, len(self.residuals)): sum += np.log(2 * np.pi) + np.log((x[0] * self.model_errors[i] + x[1]) ** 2) + (self.residuals[i]) ** 2 / (x[0] * self.model_errors[i] + x[1]) ** 2 return 0.5 * sum / len(self.residuals)