Source code for mastml.legos.model_finder

"""
This module provides a name_to_constructor dict for all models/estimators in scikit-learn, plus a couple test models and
error handling functions
"""

import warnings
import inspect

import sklearn.base
import sklearn.utils
import joblib
import numpy as np
import os

# Sometimes xgboost is hard to install so make it optional
try:
    import xgboost as xgb
except:
    print('If you want to use XGBoost models, please manually install xgboost package with '
          'pip install xgboost. If have error with finding libxgboost.dylib library, do'
          'brew install libomp. If do not have brew on your system, first do'
          ' ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" from the Terminal')


try:
    import keras
    from keras.models import model_from_json
    from keras.models import load_model
    from keras.models import Sequential
except:
    print('Keras is an optional dependency. To use, do pip install keras tensorflow')

import random

import pandas as pd

#from . import keras_models
from mastml import utils

from scipy import stats

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    name_to_constructor = dict(sklearn.utils.all_estimators())

[docs]class AlwaysFive(sklearn.base.RegressorMixin): """ Class used as a test model that always predicts a value of 5. Args: constant: (int), the value to predict. Always 5 by default Methods: fit: Just passes through to maintain scikit-learn structure predict: Provides predicted model values based on X features Args: X: (numpy array), array of X features Returns: (numpy array), prediction array where all values are equal to constant """ def __init__(self, constant = 5): self.five = constant
[docs] def fit(self, X, y, groups=None): return self
[docs] def predict(self, X): return np.array([self.five for _ in range(len(X))])
[docs]class RandomGuesser(sklearn.base.RegressorMixin): """ Class used as a test model that always predicts random values for y data. Args: None Methods: fit: Constructs possible predicted values based on y data Args: y: (numpy array), array of y data predict: Provides predicted model values based on X features Args: X: (numpy array), array of X features Returns: (numpy array), prediction array where all values are random selections of y data """ def __init__(self): pass
[docs] def fit(self, X, y, groups=None): self.possible_answers = y return self
[docs] def predict(self, X): return np.random.choice(self.possible_answers, size=X.shape[0])
[docs]class KerasRegressor(): def __init__(self, conf_dict): self.conf_dict = conf_dict self.model = self.build_model()
[docs] def build_model(self): model_vals = self.conf_dict model = Sequential() for layer_dict, layer_val in model_vals.items(): if (layer_dict != 'FitParams'): layer_type = layer_val.get('layer_type') layer_name_asstr = layer_type if layer_name_asstr == 'Dense': neuron_num = int(layer_val.get('neuron_num')) if (layer_dict == 'Layer1'): input_dim = int(layer_val.get('input_dim')) kernel_initializer = layer_val.get('kernel_initializer') activation = layer_val.get('activation') elif layer_name_asstr == 'Dropout': rate = float(layer_val.get('rate')) for layer_name, cls in inspect.getmembers(keras.layers, inspect.isclass): layer_type = getattr(keras.layers, layer_name_asstr) # (neuron_num) else: if layer_val.get('rate'): self.rate = float(layer_val.get('rate')) if layer_val.get('epochs'): self.epochs = int(layer_val.get('epochs')) else: self.epochs = 1 if layer_val.get('batch_size'): self.batch_size = int(layer_val.get('batch_size')) else: self.batch_size = None if layer_val.get('loss'): self.loss = str(layer_val.get('loss')) else: self.loss = 'mean_squared_error' if layer_val.get('optimizer'): self.optimizer = str(layer_val.get('optimizer')) else: self.optimizer = 'adam' if layer_val.get('metrics'): self.metrics = layer_val.get('metrics').split(',') else: self.metrics = ['mae'] if layer_val.get('verbose'): self.verbose = str(layer_val.get('verbose')) else: self.verbose = 0 if layer_val.get('shuffle'): self.shuffle = bool(layer_val.get('shuffle')) else: self.shuffle = True if layer_val.get('validation_split'): self.validation_split = float(layer_val.get('validation_split')) else: self.validation_split = 0.0 continue if (layer_dict == 'Layer1'): model.add(layer_type(neuron_num, input_dim=input_dim, kernel_initializer=kernel_initializer, activation=activation)) else: if layer_name_asstr == 'Dense': model.add(layer_type(neuron_num, kernel_initializer=kernel_initializer, activation=activation)) if layer_name_asstr == 'Dropout': model.add(layer_type(rate=rate)) return model
[docs] def fit(self, X, Y): # Need to rebuild and re-compile model at every fit instance so don't have information of weights from other fits self.model = self.build_model() self.model.compile(loss=self.loss, optimizer=self.optimizer, metrics=self.metrics) return self.model.fit(X, Y, epochs=self.epochs, batch_size=self.batch_size, verbose=self.verbose, validation_split=self.validation_split, shuffle=self.shuffle)
[docs] def predict(self, X): return self.model.predict(X)
[docs] def summary(self): return self.model.summary()
# ref: https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.BaggingRegressor.html#sklearn.ensemble.BaggingRegressor # NOTE: in order to use this, other models for the custom ensemble must be defined # in the conf file with "_ensemble" somewhere in the name
[docs]class EnsembleRegressor(): def __init__(self, n_estimators, num_samples, model_list, num_models): self.model_list = model_list # should be list of strings self.num_models = num_models # how many of each of the specified models should be included in the ensemble self.n_estimators = sum(self.num_models) self.num_samples = num_samples self.max_samples = num_samples self.bootstrapped_datasets = [] self.bootstrapped_idxs = [] self.all_preds = [] self.path = "" self.model = self.build_models() # actually a list of models for use as the members in the ensemble self.fold = -1 self.bootstrap = True
[docs] def build_models(self): model = [] for i, num_m in enumerate(self.num_models): for j in range(num_m): model.append(self.model_list[i]) return model
[docs] def setup(self, path): self.fold += 1 self.bootstrapped_idxs = [] self.bootstrapped_datasets = [] self.path = path
[docs] def fit(self, X, Y): X = X.values Y = Y.values idxs = np.arange(len(X)) # fit each model in the ensemble for i in range(self.n_estimators): model = self.model[i] # do bootstrapping given the validation data bootstrap_idxs = random.choices(idxs, k=self.num_samples) bootstrap_X = X[bootstrap_idxs] bootstrap_Y = Y[bootstrap_idxs] if 1 == len(bootstrap_X.shape): bootstrap_X = np.expand_dims(np.asarray(bootstrap_X), -1) if 1 == len(bootstrap_Y.shape): bootstrap_Y = np.expand_dims(np.asarray(bootstrap_Y), -1) self.bootstrapped_idxs.append(bootstrap_idxs) self.bootstrapped_datasets.append(bootstrap_X) model.fit(bootstrap_X, bootstrap_Y)
[docs] def predict(self, X, return_std=False): if isinstance(X, pd.DataFrame): X = X.values all_preds = [] means = [] for x_i in range(len(X)): preds = [] for i in range(self.n_estimators): sample_X = X[x_i] if 1 == len(sample_X.shape): sample_X = np.expand_dims(np.asarray(sample_X), 0) preds.append(self.model[i].predict(sample_X)) all_preds.append(preds) means.append(np.mean(preds)) # NOTE for ref (if manual jackknife implementation is necessary) # https://www.jpytr.com/post/random_forests_and_jackknife_variance/ # https://github.com/scikit-learn-contrib/forest-confidence-interval/tree/master/forestci # http://contrib.scikit-learn.org/forest-confidence-interval/reference/forestci.html self.all_preds = all_preds return np.asarray(means)
# check for failed fits, warn users, and re-calculate
[docs] def stats_check_models(self, X, Y): if self.n_estimators > 10: maes = [] for i in range(self.n_estimators): abs_errors = np.absolute(np.absolute(np.squeeze(np.asarray(self.all_preds)[:,i])) - Y) maes.append(sum(abs_errors) / len(abs_errors)) alpha = 0.01 bad_idxs = [] for i in range(self.n_estimators): other_maes = np.delete(maes, [i]) # ref: https://towardsdatascience.com/statistical-significance-hypothesis-testing-the-normal-curve-and-p-values-93274fa32687 z_score = (maes[i] - np.mean(other_maes)) / np.std(other_maes) # ref: https://stackoverflow.com/questions/3496656/convert-z-score-z-value-standard-score-to-p-value-for-normal-distribution-in/3508321 p_val = stats.norm.sf(abs(z_score))*2 if p_val <= alpha: # TODO ok to print these/how to print/log properly? print("Estimator {} failed under statistical significance threshold {} (p_val {}), relevant dataset output to file with name format \'<fold>_<estimator idx>_bootstrapped_dataset.csv\'".format(i, alpha, p_val)) print("bad estimator mae: {}".format(maes[i])) print("mean mae (for ref):") print(np.mean(maes)) np.savetxt(self.path + "\\{}_{}_bootstrapped_dataset.csv".format(self.fold, i), self.bootstrapped_datasets[i], delimiter=",") bad_idxs.append(i) if len(bad_idxs) == self.n_estimators: print("ALL models failed, wtf is your data") return #self.all_preds = np.delete(self.all_preds, bad_idxs, 1) y_preds = [] for idx, x_i in enumerate(self.all_preds): y_preds.append(np.mean(x_i)) return np.asarray(y_preds)
[docs]class ModelImport(): """ Class used to import pickled models from previous machine learning fits Args: model_path (str): string designating the path to load the saved .pkl model file Methods: fit: Does nothing, present for compatibility purposes Args: X: Nonetype y: Nonetype groups: Nonetype predict: Provides predicted model values based on X features Args: X: (numpy array), array of X features Returns: (numpy array), prediction array using imported model """ def __init__(self, model_path): self.model_path = model_path self.model = joblib.load(self.model_path)
[docs] def fit(self, X=None, y=None, groups=None): """ Only here for compatibility """ return
[docs] def predict(self, X): return self.model.predict(X)
# Optional to have xgboost working try: custom_models = { 'AlwaysFive': AlwaysFive, 'RandomGuesser': RandomGuesser, 'ModelImport': ModelImport, 'XGBRegressor': xgb.XGBRegressor, 'XGBClassifier': xgb.XGBClassifier, 'KerasRegressor': KerasRegressor, 'EnsembleRegressor': EnsembleRegressor #'DNNClassifier': keras_models.DNNClassifier } except NameError: custom_models = { 'AlwaysFive': AlwaysFive, 'RandomGuesser': RandomGuesser, 'ModelImport': ModelImport, 'KerasRegressor': KerasRegressor, 'EnsembleRegressor': EnsembleRegressor # 'DNNClassifier': keras_models.DNNClassifier } name_to_constructor.update(custom_models)
[docs]def find_model(model_name): """ Method used to check model names conform to scikit-learn model/estimator names Args: model_name: (str), the name of a model/estimator Returns: (str), the scikit-learn model name or raises InvalidModel error """ try: return name_to_constructor[model_name] except KeyError: raise utils.InvalidModel(f"Model '{model_name}' does not exist in scikit-learn.")
[docs]def check_models_mixed(model_names): """ Method used to check whether the user has mixed regression and classification tasks Args: model_names: (list), list containing names of models/estimators Returns: (bool), whether or not a classifier was found, or raises exception if both regression and classification models present. """ found_classifier = found_regressor = False for name in model_names: if name in custom_models: continue class1 = find_model(name) if issubclass(class1, sklearn.base.ClassifierMixin): found_classifier = True elif issubclass(class1, sklearn.base.RegressorMixin): found_regressor = True else: raise Exception(f"Model '{name}' is neither a classifier nor a regressor") if found_classifier and found_regressor: raise Exception("Both classifiers and regressor models have been included") return found_classifier