"""
Main MAST-ML module responsible for executing the workflow of a MAST-ML run
"""
import argparse
import inspect
import os
import shutil
import logging
import warnings
import re
import json
from datetime import datetime
from collections import OrderedDict
from os.path import join # We use join tons
from functools import reduce
from contextlib import redirect_stdout
from copy import deepcopy
import numpy as np
import pandas as pd
import joblib
from sklearn.exceptions import UndefinedMetricWarning
from sklearn.model_selection import LeaveOneGroupOut
from sklearn.metrics import make_scorer
from sklearn.base import clone
from mastml import conf_parser, data_loader, html_helper, plot_helper, utils, learning_curve, data_cleaner, metrics
from mastml.legos import (data_splitters, feature_generators, feature_normalizers,
feature_selectors, model_finder, util_legos, randomizers, hyper_opt)
from mastml.legos import clusterers as legos_clusterers
from mastml.legos import model_hosting
log = logging.getLogger('mastml')
[docs]def main(conf_path, data_path, outdir=join(os.getcwd(), 'results_mastml_run'), verbosity=0):
"""
This method is responsible for setting up the initial stage of the MAST-ML run, such as parsing input directories to
designate where data will be imported and results saved to, as well as creation of the MAST-ML run log.
Args:
conf_path: (str), the path supplied by the user which contains the input configuration file
data_path: (str), the path supplied by the user which contains the input data file (as CSV or XLSX)
outdir: (str), the path supplied by the user which determines where the output results are saved to
verbosity: (int), the verbosity level of the MAST-ML log, which determines the amount of information written to the log.
Returns:
outdir: (str), the path supplied by the user which determines where the output results are saved to (needed by other calls in MAST-ML)
"""
conf_path, data_path, outdir = check_paths(conf_path, data_path, outdir)
utils.activate_logging(outdir, (str(conf_path), str(data_path), outdir), verbosity=verbosity)
if verbosity >= 1:
warnings.simplefilter('error') # turn warnings into errors
elif verbosity <= -1:
warnings.simplefilter('ignore') # ignore warnings
try:
mastml_run(conf_path, data_path, outdir)
except utils.MastError as e:
# catch user errors, log and print, but don't raise and show them that nasty stack
log.error(str(e))
except Exception as e:
# catch the error, save it to file, then raise it back up
log.error('A runtime exception has occured, please go to '
'https://github.com/uw-cmg/MAST-ML/issues and post your issue.')
log.exception(e)
raise e
return outdir # so a calling program can know where we actually saved it
[docs]def mastml_run(conf_path, data_path, outdir):
"""
This method is responsible for conducting the main MAST-ML run workflow
Args:
conf_path: (str), the path supplied by the user which contains the input configuration file
data_path: (str), the path supplied by the user which contains the input data file (as CSV or XLSX)
outdir: (str), the path supplied by the user which determines where the output results are saved to
Returns:
None
"""
" Runs operations specifed in conf_path on data_path and puts results in outdir "
# Copy the original input files to the output directory for easy reference
log.info("Copying input files to output directory...")
if type(conf_path) is str:
shutil.copy2(conf_path, outdir)
elif type(conf_path) is dict:
with open(join(outdir, 'conf_file.conf'), 'w') as f:
json.dump(conf_path, f)
if type(data_path) is str:
shutil.copy2(data_path, outdir)
elif type(data_path) is type(pd.DataFrame()):
data_path.to_excel(join(outdir, 'input_data.xlsx'), index=False)
data_path = join(outdir, 'input_data.xlsx')
# Load in and parse the configuration and data files:
if type(conf_path) is str:
conf = conf_parser.parse_conf_file(conf_path, from_dict=False)
elif type(conf_path) is dict:
conf = conf_parser.parse_conf_file(conf_path, from_dict=True)
else:
raise TypeError('Your conf_path must either be a path string to a .conf file or a dict directly containing the config info')
MiscSettings = conf['MiscSettings']
is_classification = conf['is_classification']
# The df is used by feature generators, clusterers, and grouping_column to
# create more features for x.
# X is model input, y is target feature for model
df, X, X_noinput, X_grouped, y = data_loader.load_data(data_path,
conf['GeneralSetup']['input_features'],
conf['GeneralSetup']['input_target'],
conf['GeneralSetup']['input_grouping'],
conf['GeneralSetup']['input_other'])
if not conf['GeneralSetup']['input_grouping']:
X_grouped = pd.DataFrame()
# Perform data cleaning here
dc = conf['DataCleaning']
if 'cleaning_method' not in dc.keys():
log.warning("You have chosen not to specify a method of data_cleaning in the input file. By default, any feature entries "
"containing NaN will result in removal of the feature and any target data entries containing NaN will "
"result in removal of that target data point.")
dc['cleaning_method'] = 'remove'
if X.shape[1] == 0:
# There are no X feature vectors specified, so can't clean data
log.warning("There are no X feature vectors imported from the data file. Therefore, data cleaning cannot be performed.")
else:
# Always scan the input data and flag potential outliers
data_cleaner.flag_outliers(df=df, conf_not_input_features=conf['GeneralSetup']['input_other'],
savepath=outdir,
n_stdevs=3)
if dc['cleaning_method'] == 'remove':
df, nan_indices = data_cleaner.remove(df, axis=1)
X, nan_indices = data_cleaner.remove(X, axis=1)
X_noinput, nan_indices = data_cleaner.remove(X_noinput, axis=1)
X_grouped, nan_indices = data_cleaner.remove(X_grouped, axis=1)
elif dc['cleaning_method'] == 'imputation':
log.warning("You have selected data cleaning with Imputation. Note that imputation will not resolve missing target data. "
"It is recommended to remove missing target data")
if 'imputation_strategy' not in dc.keys():
log.warning("You have chosen to perform data imputation but have not selected an imputation strategy. By default, "
"the mean will be used as the imputation strategy")
dc['imputation_strategy'] = 'mean'
df = data_cleaner.imputation(df, dc['imputation_strategy'], X_noinput.columns)
X = data_cleaner.imputation(X, dc['imputation_strategy'])
elif dc['cleaning_method'] == 'ppca':
log.warning("You have selected data cleaning with PPCA. Note that PPCA will not work to estimate missing target values, "
"at least a 2D matrix is needed. It is recommended you remove missing target data")
df = data_cleaner.ppca(df, X_noinput.columns)
X = data_cleaner.ppca(X)
else:
log.error("You have specified an invalid data cleaning method. Choose from: remove, imputation, or ppca")
exit()
# Check if any y target data values are missing or NaN
shape_before = y.shape
y, nan_indices = data_cleaner.remove(y, axis=0)
shape_after = y.shape
if shape_after != shape_before:
log.info(
'Warning: some y target data rows were automatically removed because they were either empty or contained '
'"NaN" entries. MAST-ML will continue your run with this modified data set.')
# Need to modify df, X, etc. to remove same rows as were removed from y target data
df = df.drop(labels=nan_indices, axis=0)
X = X.drop(labels=nan_indices, axis=0)
X_noinput = X_noinput.drop(labels=nan_indices, axis=0)
if X_grouped.shape[0] > 0:
X_grouped = X_grouped.drop(labels=nan_indices, axis=0)
# randomly shuffles y values if randomizer is on
if conf['GeneralSetup']['randomizer'] is True:
log.warning("Randomizer is enabled, so target feature will be shuffled,"
" and results should be null for a given model")
y = randomizers.Randomizer().fit().transform(df=y)
"""
# get parameters out for 'validation_column'
is_validation = 'validation_columns' in conf['GeneralSetup']
if is_validation:
if type(conf['GeneralSetup']['validation_columns']) is list:
validation_column_names = list(conf['GeneralSetup']['validation_columns'])
elif type(conf['GeneralSetup']['validation_columns']) is str:
validation_column_names = list()
validation_column_names.append(conf['GeneralSetup']['validation_columns'][:])
validation_columns = dict()
for validation_column_name in validation_column_names:
validation_columns[validation_column_name] = df[validation_column_name]
validation_columns = pd.DataFrame(validation_columns)
validation_X = list()
validation_y = list()
# TODO make this block its own function
for validation_column_name in validation_column_names:
# X_, y_ = _exclude_validation(X, validation_columns[validation_column_name]), _exclude_validation(y, validation_columns[validation_column_name])
validation_X.append(pd.DataFrame(_exclude_validation(X, validation_columns[validation_column_name])))
validation_y.append(pd.DataFrame(_exclude_validation(y, validation_columns[validation_column_name])))
idxy_list = list()
for i, _ in enumerate(validation_y):
idxy_list.append(validation_y[i].index)
# Get intersection of indices between all prediction columns
intersection = reduce(np.intersect1d, (i for i in idxy_list))
X_novalidation = X.iloc[intersection]
y_novalidation = y.iloc[intersection]
X_grouped_novalidation = X_grouped.iloc[intersection]
else:
X_novalidation = X
y_novalidation = y
X_grouped_novalidation = X_grouped
"""
if conf['MiscSettings']['plot_target_histogram']:
# First, save input data stats to csv
y.describe().to_csv(join(outdir, 'input_data_statistics.csv'))
plot_helper.plot_target_histogram(y, join(outdir, 'target_histogram.png'), label=y.name)
# Get the appropriate collection of metrics:
metrics_dict = conf['GeneralSetup']['metrics']
# Extract columns that some splitter need to do grouped splitting using 'grouping_column'
# special argument
splitter_to_group_names = _extract_grouping_column_names(conf['DataSplits'])
log.debug('splitter_to_group_names:\n' + str(splitter_to_group_names))
# Instantiate models first so we can snatch them and pass them into feature selectors
models = _instantiate(conf['Models'],
model_finder.name_to_constructor,
'model')
# Need to specially snatch the GPR model if it is in models list because it contains special kernel object. Do
# this before setting up feature selectors in case GPR used in e.g. forward selection
models = _snatch_gpr_model(models, conf['Models'])
original_models = models
models = _snatch_models(models, conf['FeatureSelection'])
# Instantiate all the sections of the conf file:
generators = _instantiate(conf['FeatureGeneration'],
feature_generators.name_to_constructor,
'featuregenerator')
clusterers = _instantiate(conf['Clustering'],
legos_clusterers.name_to_constructor,
'clusterer')
normalizers = _instantiate(conf['FeatureNormalization'],
feature_normalizers.name_to_constructor,
'featurenormalizer')
splitters = _instantiate(conf['DataSplits'],
data_splitters.name_to_constructor,
'datasplit')
def snatch_model_cv_and_scoring_for_learning_curve(models):
models = OrderedDict(models)
if conf['LearningCurve']:
# Get model
name = conf['LearningCurve']['estimator']
conf['LearningCurve']['estimator'] = models[name]
del models[name]
# Get cv
name = conf['LearningCurve']['cv']
splitter_count = 0
for splitter in splitters:
if name in splitter:
conf['LearningCurve']['cv'] = splitter[1]
break
else:
splitter_count += 1
del splitters[splitter_count]
return models
models = snatch_model_cv_and_scoring_for_learning_curve(models=models)
models = _snatch_keras_model(models, conf['Models'])
original_models = models
# init of ensemble models
for long_name, (name, kwargs) in conf['Models'].items():
if 'EnsembleRegressor' in long_name:
sub_models = []
sub_models_names = models[long_name].model
for submodel_long_name in sub_models_names:
for sm_long_name, (sm_name, sm_kwargs) in conf['Models'].items():
if sm_long_name in submodel_long_name:
sm = None
if 'KerasRegressor' in sm_long_name:
sm = model_finder.KerasRegressor(conf['Models']['KerasRegressor_ensemble'][1])
else:
sm = clone(models[sm_long_name])
sub_models.append(sm)
break
models[long_name].model = sub_models
for long_name, (name, kwargs) in conf['Models'].items():
if '_ensemble' in long_name:
del models[long_name]
# Need to snatch models and CV objects for Hyperparam Opt
hyperopt_params = _snatch_models_cv_for_hyperopt(conf, models, splitters, is_classification)
hyperopts = _instantiate(hyperopt_params,
hyper_opt.name_to_constructor,
'hyperopt')
hyperopts = OrderedDict(hyperopts)
hyperopts = list(hyperopts.items())
# Snatch splitter for use in feature selection, particularly RFECV
splitters = OrderedDict(splitters) # for easier modification
_snatch_splitters(splitters, conf['FeatureSelection'])
splitters = list(splitters.items())
selectors = _instantiate(conf['FeatureSelection'],
feature_selectors.name_to_constructor,
'featureselector', X_grouped=np.array(X_grouped).reshape(-1, ), X_indices=np.array(X.index.tolist()).reshape(-1, 1))
log.debug(f'generators: \n{generators}')
log.debug(f'clusterers: \n{clusterers}')
log.debug(f'normalizers: \n{normalizers}')
log.debug(f'hyperopts: \n{hyperopts}')
log.debug(f'selectors: \n{selectors}')
log.debug(f'splitters: \n{splitters}')
# TODO make this block its own function, and change naming from is_validation to is_test here and throughout. Just symantic annoyance.
# get parameters out for 'validation_column'
is_validation = 'input_testdata' in conf['GeneralSetup']
if is_validation:
if type(conf['GeneralSetup']['input_testdata']) is list:
validation_column_names = list(conf['GeneralSetup']['input_testdata'])
elif type(conf['GeneralSetup']['input_testdata']) is str:
validation_column_names = list()
validation_column_names.append(conf['GeneralSetup']['input_testdata'][:])
validation_columns = dict()
for validation_column_name in validation_column_names:
validation_columns[validation_column_name] = df[validation_column_name]
validation_columns = pd.DataFrame(validation_columns)
validation_X = list()
validation_y = list()
def do_all_combos(X, y, df):
log.info(f"There are {len(normalizers)} feature normalizers, {len(hyperopts)} hyperparameter optimizers, "
f"{len(selectors)} feature selectors, {len(models)} models, and {len(splitters)} splitters.")
def generate_features():
log.info("Doing feature generation...")
dataframes = [instance.fit_transform(df, y) for _, instance in generators]
dataframe = pd.concat(dataframes, 1)
log.info("Saving generated data to csv...")
log.debug(f'generated cols: {dataframe.columns}')
filename = join(outdir, "generated_features.csv")
pd.concat([dataframe, X_noinput, y], 1).to_csv(filename, index=False)
return dataframe
generated_df = generate_features()
def remove_constants():
dataframe = _remove_constant_features(generated_df)
log.info("Saving generated data without constant columns to csv...")
filename = join(outdir, "generated_features_no_constant_columns.csv")
pd.concat([dataframe, X_noinput, y], 1).to_csv(filename, index=False)
return dataframe
generated_df = remove_constants()
# add in generated features
X = pd.concat([X, generated_df], axis=1)
# add in generated features to full dataframe
df = pd.concat([df, generated_df], axis=1)
# Check size of X; if there are no feature columns then throw error
if X.shape[1] == 0:
raise utils.InvalidValue('No feature vectors were found in the dataframe. Please either use feature generation methods'
'or specify input_features in the input file.')
# remove repeat columns (keep the first one)
def remove_repeats(X):
repeated_columns = X.loc[:, X.columns.duplicated()].columns
if not repeated_columns.empty:
log.warning(f"Throwing away {len(repeated_columns)} because they are repeats.")
log.debug(f"Throwing away columns because they are repeats: {repeated_columns}")
X = X.loc[:,~X.columns.duplicated()]
return X
X = remove_repeats(X)
# TODO make this block its own function
# get parameters out for 'validation_column'
if is_validation:
for validation_column_name in validation_column_names:
# X_, y_ = _exclude_validation(X, validation_columns[validation_column_name]), _exclude_validation(y, validation_columns[validation_column_name])
validation_X.append(pd.DataFrame(_exclude_validation(X, validation_columns[validation_column_name])))
validation_y.append(pd.DataFrame(_exclude_validation(y, validation_columns[validation_column_name])))
idxy_list = list()
for i, _ in enumerate(validation_y):
idxy_list.append(validation_y[i].index)
# Get intersection of indices between all prediction columns
intersection = reduce(np.intersect1d, (i for i in idxy_list))
X_novalidation = X.iloc[intersection]
y_novalidation = y.iloc[intersection]
if conf['GeneralSetup']['input_grouping']:
X_grouped_novalidation = X_grouped.iloc[intersection]
else:
X_grouped_novalidation = pd.DataFrame()
else:
X_novalidation = X
y_novalidation = y
if conf['GeneralSetup']['input_grouping']:
X_grouped_novalidation = X_grouped
else:
X_grouped_novalidation = pd.DataFrame()
def make_clustered_df():
log.info("Doing clustering...")
clustered_df = pd.DataFrame()
for name, instance in clusterers:
clustered_df[name] = instance.fit_predict(X, y)
return clustered_df
clustered_df = make_clustered_df() # Each column is a clustering algorithm
def make_feature_vs_target_plots():
if clustered_df.empty:
for column in X: # plot y against each x column
filename = f'{column}_vs_target_scatter.png'
plot_helper.plot_scatter(X[column], y, join(outdir, filename),
xlabel=column, groups=None, label=y.name)
else:
for name in clustered_df.columns: # for each cluster, plot y against each x column
for column in X:
filename = f'{column}_vs_target_by_{name}_scatter.png'
plot_helper.plot_scatter(X[column], y, join(outdir, filename),
clustered_df[name], xlabel=column, label=y.name)
if MiscSettings['plot_each_feature_vs_target']:
make_feature_vs_target_plots()
log.info("Saving clustered data to csv...")
# Add new cluster info to X df
if not clustered_df.empty:
X = pd.concat([X, clustered_df], axis=1)
pd.concat([X, y], 1).to_csv(join(outdir, "clusters.csv"), index=False)
def make_normalizer_selector_dataframe_triples(models):
triples = []
nonlocal y, y_novalidation
for normalizer_name, normalizer_instance in normalizers:
# Run feature normalization
log.info(f"Running normalizer {normalizer_name} ...")
normalizer_instance_y = normalizer_instance
normalizer_instance_y_novalidation = normalizer_instance
# HERE- try to address issue with normalizing non-validation part of dataset
normalizer = normalizer_instance.fit(X_novalidation, y)
X_normalized = normalizer.transform(X)
X_novalidation_normalized = normalizer.transform(X_novalidation)
if conf['MiscSettings']['normalize_target_feature'] is True:
yreshape = pd.DataFrame(np.array(y).reshape(-1, 1))
y_novalidation_new = pd.DataFrame(np.array(y_novalidation).reshape(-1, 1))
y_normalized = normalizer_instance_y.fit_transform(yreshape, yreshape)
y_novalidation_normalized = normalizer_instance_y_novalidation.fit_transform(y_novalidation_new, y_novalidation_new)
y_normalized.columns = [conf['GeneralSetup']['input_target']]
y_novalidation_normalized.columns = [conf['GeneralSetup']['input_target']]
y = pd.Series(np.squeeze(y_normalized), name=conf['GeneralSetup']['input_target'])
y_novalidation = pd.Series(np.squeeze(y_novalidation_normalized), name=conf['GeneralSetup']['input_target'])
else:
normalizer_instance_y = None
log.info("Saving normalized data to csv...")
dirname = join(outdir, normalizer_name)
os.mkdir(dirname)
pd.concat([X_normalized, X_noinput, y], 1).to_csv(join(dirname, "normalized.csv"), index=False)
# Save off the normalizer as .pkl for future import
joblib.dump(normalizer, join(dirname, str(normalizer.__class__.__name__) + ".pkl"))
# HERE- find data twins
# Put learning curve here??
if conf['LearningCurve']:
learning_curve_estimator = conf['LearningCurve']['estimator']
learning_curve_scoring = conf['LearningCurve']['scoring']
n_features_to_select = int(conf['LearningCurve']['n_features_to_select'])
learning_curve_cv = conf['LearningCurve']['cv']
try:
selector_name = conf['LearningCurve']['selector_name']
except KeyError:
selector_name = None
# Get score name from scoring object
scoring_name = learning_curve_scoring._score_func.__name__
scoring_name_nice = ''
for s in scoring_name.split('_'):
scoring_name_nice += s + ' '
# Do sample learning curve
train_sizes, train_mean, test_mean, train_stdev, test_stdev = learning_curve.sample_learning_curve(X=X_novalidation_normalized, y=y_novalidation,
estimator=learning_curve_estimator, cv=learning_curve_cv,
scoring=learning_curve_scoring,
Xgroups=X_grouped_novalidation)
plot_helper.plot_learning_curve(train_sizes, train_mean, test_mean, train_stdev, test_stdev,
scoring_name_nice, 'sample_learning_curve',
join(dirname, f'data_learning_curve'))
# Do feature learning curve
train_sizes, train_mean, test_mean, train_stdev, test_stdev = learning_curve.feature_learning_curve(X=X_novalidation_normalized, y=y_novalidation,
estimator=learning_curve_estimator, cv=learning_curve_cv,
scoring=learning_curve_scoring, selector_name=selector_name,
savepath=dirname,
n_features_to_select=n_features_to_select,
Xgroups=X_grouped_novalidation)
plot_helper.plot_learning_curve(train_sizes, train_mean, test_mean, train_stdev, test_stdev,
scoring_name_nice, 'feature_learning_curve',
join(dirname, f'feature_learning_curve'))
log.info("Running selectors...")
# Run feature selection
for selector_name, selector_instance in selectors:
log.info(f" Running selector {selector_name} ...")
dirname = join(outdir, normalizer_name, selector_name)
os.mkdir(dirname)
# NOTE: Changed from .fit_transform to .fit.transform
# because PCA.fit_transform doesn't call PCA.transform
if selector_instance.__class__.__name__ == 'MASTMLFeatureSelector':
dirname = join(outdir, normalizer_name)
X_selected = selector_instance.fit(X_novalidation_normalized, y_novalidation, dirname, X_grouped_novalidation).transform(X_novalidation_normalized)
elif selector_instance.__class__.__name__ == 'SequentialFeatureSelector':
X_selected = selector_instance.fit(X_novalidation_normalized, y_novalidation).transform(X_novalidation_normalized)
# Need to reset indices in case have test data, otherwise df.equals won't properly find column names
X_novalidation_normalized_reset = X_novalidation_normalized.reset_index()
# SFS renames the columns. Need to replace the column names with correct feature names.
feature_name_dict = dict()
for feature in X_selected.columns.tolist():
for realfeature in X_novalidation_normalized.columns.tolist():
if X_novalidation_normalized_reset[realfeature].equals(X_selected[feature]):
feature_name_dict[feature] = realfeature
X_selected.rename(columns= feature_name_dict, inplace=True)
elif selector_instance.__class__.__name__ == 'PearsonSelector':
X_selected = selector_instance.fit(X=X_novalidation_normalized, savepath=dirname, y=y_novalidation).transform(X_novalidation_normalized)
else:
X_selected = selector_instance.fit(X_novalidation_normalized, y_novalidation).transform(X_novalidation_normalized)
features_selected = X_selected.columns.tolist()
# Need to do this instead of taking X_selected directly because otherwise won't concatenate correctly with test data values, which are
# left out of the feature selection process.
X_selected = X_normalized[features_selected]
log.info(" Saving selected features to csv...")
pd.concat([X_selected, X_noinput, y], 1).to_csv(join(dirname, "selected.csv"), index=False)
#TODO: fix this naming convention
triples.append((normalizer_name, normalizer_instance_y, selector_name, X_selected))
# Run Hyperparam optimization, update model list with optimized model(s)
for hyperopt_name, hyperopt_instance in hyperopts:
try:
log.info(f" Running hyperopt {hyperopt_name} ...")
log.info(f" Saving optimized hyperparams and data to csv...")
dirname = join(outdir, normalizer_name, selector_name, hyperopt_name)
os.mkdir(dirname)
estimator_name = hyperopt_instance._estimator_name
best_estimator = hyperopt_instance.fit(X_selected, y, savepath=os.path.join(dirname, str(estimator_name)+'.csv'))
new_name = estimator_name + '_' + str(normalizer_name) + '_' + str(selector_name) + '_' + str(hyperopt_name)
new_model = best_estimator
models[new_name] = new_model
# Update models list with new hyperparams
#for model in models:
# # model[0] is name, model[1] is instance
# # Check that this particular model long_name had its hyperparams optimized
# for name in hyperopt_params.keys():
# if model[0] in name[:]:
# model[0] = model[0]+'_nonoptimized'
# #model[1] = best_estimator
# # Need to update model as new model name and instance to handle multiple
# # selector/optimization/fitting path
# new_name = model[0]+'_'+str(normalizer_name)+'_'+str(selector_name)+'_'+str(hyperopt_name)
# new_model = best_estimator
# models[new_name] = new_model
except:
raise utils.InvalidValue
return triples
normalizer_selector_dataframe_triples = make_normalizer_selector_dataframe_triples(models=models)
## DataSplits (cross-product)
## Collect grouping columns, splitter_to_groupmes is a dict of splitter name to grouping col
log.debug("Finding splitter-required columns in data...")
def make_splittername_splitlist_pairs():
# exclude the testing_only rows from use in splits
if is_validation:
validation_X = list()
validation_y = list()
for validation_column_name in validation_column_names:
#X_, y_ = _exclude_validation(X, validation_columns[validation_column_name]), _exclude_validation(y, validation_columns[validation_column_name])
validation_X.append(pd.DataFrame(_exclude_validation(X, validation_columns[validation_column_name])))
validation_y.append(pd.DataFrame(_exclude_validation(y, validation_columns[validation_column_name])))
idxy_list = list()
for i, _ in enumerate(validation_y):
idxy_list.append(validation_y[i].index)
# Get intersection of indices between all prediction columns
intersection = reduce(np.intersect1d, (i for i in idxy_list))
X_ = X.iloc[intersection]
y_ = y.iloc[intersection]
else:
X_, y_ = X, y
pairs = []
def fix_index(array):
return X_.index.values[array]
def proper_index(splits):
""" For example, if X's indexs are [1,4,6] and you split
[ [[0],[1,2]], [[1],[0,2]] ] then we would get
[ [[1],[4,6]], [[4],[1,6]] ]
Needed only for valdation row stuff.
"""
return tuple(tuple(fix_index(part) for part in split) for split in splits)
# Collect all the grouping columns, `None` if not needed
splitter_to_group_column = dict()
splitter_to_group_column_no_validation = dict()
for name, instance in splitters:
# if this splitter depends on grouping
if name in splitter_to_group_names:
col = splitter_to_group_names[name]
log.debug(f" Finding {col} for {name}...")
# Locate the grouping column among all dataframes
for df_ in [clustered_df, df, X_]:
if col in df_.columns:
# FOund it!
# Get groups for plotting first
splitter_to_group_column[name] = df_[col].values
if is_validation:
_df_list = list()
if df_ is not clustered_df:
# exclude for df_ so that rows match up in splitter
for validation_column_name in validation_column_names:
df_ = _exclude_validation(df_, validation_columns[validation_column_name])
_df_list.append(df_)
elif df_ is clustered_df:
# merge the cluster data df_ to full df
df[col] = df_
for validation_column_name in validation_column_names:
df_ = _exclude_validation(df, validation_columns[validation_column_name])
_df_list.append(df_)
# Get df_ based on index intersection between all df's in _df_list
idxy_list = list()
for i, _ in enumerate(_df_list):
idxy_list.append(_df_list[i].index)
# Get intersection of indices between all prediction columns
intersection = reduce(np.intersect1d, (i for i in idxy_list))
df_ = df.iloc[intersection]
# and use the no-validation one for the split
grouping_data = df_[col].values
split = proper_index(instance.split(X_, y_, grouping_data))
pairs.append((name, split))
break
# If we didn't find that column anywhere, raise
else:
raise utils.MissingColumnError(f'DataSplit {name} needs column {col}, which '
f'was neither generated nor given by input')
# If we don't need grouping column
else:
splitter_to_group_column[name] = None
split = proper_index(instance.split(X_, y_))
pairs.append((name, split))
return pairs, splitter_to_group_column
splittername_splitlist_pairs, splitter_to_group_column = make_splittername_splitlist_pairs()
log.info("Fitting models to splits...")
def do_models_splits(models, original_models):
models = list(models.items())
original_models = list(original_models.items())
original_model_names = [model[0] for model in original_models]
all_results = []
for normalizer_name, normalizer_instance, selector_name, X in normalizer_selector_dataframe_triples:
subdir = join(outdir, normalizer_name, selector_name)
if MiscSettings['plot_each_feature_vs_target']:
#if selector_name == 'DoNothing': continue
# for each selector/normalizer, plot y against each x column
for column in X:
filename = f'{column}_vs_target.png'
plot_helper.plot_scatter(X[column], y, join(subdir, filename),
xlabel=column, label=y.name)
for model_name, model_instance in models:
#Here, add logic to only run original models and respective models from hyperparam opt
do_split = False
if model_name in original_model_names:
do_split = True
elif (normalizer_name in model_name) and (selector_name in model_name):
do_split = True
if do_split == True:
for splitter_name, trains_tests in splittername_splitlist_pairs:
grouping_data = splitter_to_group_column[splitter_name]
subdir = join(normalizer_name, selector_name, model_name, splitter_name)
log.info(f" Running splits for {subdir}")
subsubdir = join(outdir, subdir)
os.makedirs(subsubdir)
# NOTE: do_one_splitter is a big old function, does lots
runs = do_one_splitter(X, y, model_instance, subsubdir, trains_tests, grouping_data, normalizer_instance)
all_results.extend(runs)
return all_results
return do_models_splits(models, original_models)
def do_one_splitter(X, y, model, main_path, trains_tests, grouping_data, normalizer_instance):
def one_fit(split_num, train_indices, test_indices, normalizer_instance):
log.info(f" Doing split number {split_num}")
train_X, train_y = X.loc[train_indices], y.loc[train_indices]
test_X, test_y = X.loc[test_indices], y.loc[test_indices]
# split up groups into train and test as well
if grouping_data is not None:
train_groups, test_groups = grouping_data[train_indices], grouping_data[test_indices]
else:
train_groups, test_groups = None, None
path = join(main_path, f"split_{split_num}")
os.mkdir(path)
log.info(" Fitting model and making predictions...")
# Catch the ValueError associated with not being able to convert string to float
#try:
#print(train_X, train_y)
# For Keras model, save model summary to main_path and plot training/validation vals vs. epochs
if 'KerasRegressor' in str(model.__class__.__name__):
with open(join(main_path, 'keras_model_summary.txt'), 'w') as f:
with redirect_stdout(f):
model.summary()
history = model.fit(train_X, train_y)
plot_helper.plot_keras_history(model_history=history,
savepath=join(path,'keras_model_accuracy.png'),
plot_type='accuracy')
plot_helper.plot_keras_history(model_history=history,
savepath=join(path, 'keras_model_loss.png'),
plot_type='loss')
pd.DataFrame().from_dict(data=history.history).to_excel(join(path,'keras_model_data.xlsx'))
else:
model.fit(train_X, train_y)
#except ValueError:
# raise utils.InvalidValue('MAST-ML has detected an error with one of your feature vectors which has caused an error'
# ' in model fitting.')
# Save off the trained model as .pkl for future import
# TODO: note that saving keras models has broken with updated keras version
if 'KerasRegressor' not in model.__class__.__name__:
joblib.dump(model, os.path.abspath(join(path, str(model.__class__.__name__)+"_split_"+str(split_num)+".pkl")))
if is_classification:
# For classification, need probabilty of prediction to make accurate ROC curve (and other predictions??).
#TODO:Consider using only predict_proba and not predict() method for classif problems. Have exit escape if probability set to False here.
# See stackoverflow post:
#https: // stats.stackexchange.com / questions / 329857 / what - is -the - difference - between - decision
# - function - predict - proba - and -predict - fun
#params = model.get_params()
#if params['probability'] == True:
try:
train_pred_proba = model.predict_proba(train_X)
test_pred_proba = model.predict_proba(test_X)
except:
log.error('You need to perform classification with model param probability=True enabled for accurate'
' predictions, if your model has the probability param (e.g. RandomForestClassifier does not. '
'Please reset this parameter as applicable and re-run MASTML')
exit()
train_pred = model.predict(train_X)
test_pred = model.predict(test_X)
if 'EnsembleRegressor' in model.__class__.__name__:
test_pred = model.stats_check_models(test_X, test_y)
else:
train_pred = model.predict(train_X)
test_pred = model.predict(test_X)
if 'EnsembleRegressor' in model.__class__.__name__:
test_pred = model.stats_check_models(test_X, test_y)
if train_pred.ndim > 1:
train_pred = np.squeeze(train_pred)
if test_pred.ndim > 1:
test_pred = np.squeeze(test_pred)
if conf['MiscSettings']['normalize_target_feature'] is True:
train_pred = normalizer_instance.inverse_transform(train_pred)
test_pred = normalizer_instance.inverse_transform(test_pred)
train_y = pd.Series(normalizer_instance.inverse_transform(train_y))
test_y = pd.Series(normalizer_instance.inverse_transform(test_y))
# Here- for Random Forest, Extra Trees, and Gradient Boosters output feature importances
if model.__class__.__name__ in ['RandomForestRegressor', 'ExtraTreesRegressor', 'GradientBoostingRegressor']:
pd.concat([pd.DataFrame(X.columns), pd.DataFrame(model.feature_importances_)], 1).to_excel(join(path, str(model.__class__.__name__)+'_featureimportances.xlsx'), index=False)
# here is where we need to collect validation stats
if is_validation:
validation_predictions_list = list()
validation_y_forpred_list = list()
for validation_column_name in validation_column_names:
validation_X_forpred = _only_validation(X, validation_columns[validation_column_name])
validation_y_forpred = _only_validation(y, validation_columns[validation_column_name])
log.info(" Making predictions on prediction_only data...")
validation_predictions = model.predict(validation_X_forpred)
if 'EnsembleRegressor' in model.__class__.__name__:
validation_predictions = model.stats_check_models(validation_X_forpred, validation_y_forpred)
validation_predictions_list.append(validation_predictions)
validation_y_forpred_list.append(validation_y_forpred)
# save them as 'predicitons.csv'
validation_predictions = np.squeeze(validation_predictions)
validation_predictions_series = pd.Series(validation_predictions, name='clean_predictions', index=validation_X_forpred.index)
#validation_noinput_series = pd.Series(X_noinput.index, index=validation_X.index)
pd.concat([validation_X_forpred, validation_y_forpred, validation_predictions_series], 1)\
.to_csv(join(path, 'predictions_'+str(validation_column_name)+'.csv'), index=False)
else:
validation_y = None
# Save train and test data and results to csv:
log.info(" Saving train/test data and predictions to csv...")
train_pred_series = pd.DataFrame(train_pred, columns=['train_pred'], index=train_indices)
train_noinput_series = pd.DataFrame(X_noinput, index=train_indices)
pd.concat([train_X, train_y, train_pred_series, train_noinput_series], 1)\
.to_csv(join(path, 'train.csv'), index=False)
test_pred_series = pd.DataFrame(test_pred, columns=['test_pred'], index=test_indices)
test_noinput_series = pd.DataFrame(X_noinput, index=test_indices)
pd.concat([test_X, test_y, test_pred_series, test_noinput_series], 1)\
.to_csv(join(path, 'test.csv'), index=False)
log.info(" Calculating score metrics...")
split_path = main_path.split(os.sep)
# collect metrics inside a warning catching block for some things we know we should ignore
with warnings.catch_warnings():
# NOTE I tried making this more specific use warnings's regex filter but it would never
# catch it for some indeterminiable reason.
# This warning is raised when you ask for Recall on something from y_true that never
# occors in y_pred. sklearn assumes 0.0, and we want it to do so (silently).
warnings.simplefilter('ignore', UndefinedMetricWarning)
train_metrics = OrderedDict((name, function(train_y, train_pred))
for name, (_, function) in metrics_dict.items())
test_metrics = OrderedDict((name, function(test_y, test_pred))
for name, (_, function) in metrics_dict.items())
# Need to pass y_train data to get rmse/sigma for test rmse and sigma of train y
if 'rmse_over_stdev' in metrics_dict.keys():
test_metrics['rmse_over_stdev'] = metrics_dict['rmse_over_stdev'][1](test_y, test_pred, train_y)
if 'R2_adjusted' in metrics_dict.keys():
test_metrics['R2_adjusted'] = metrics_dict['R2_adjusted'][1](test_y, test_pred, test_X.shape[1])
train_metrics['R2_adjusted'] = metrics_dict['R2_adjusted'][1](train_y, train_pred, train_X.shape[1])
split_result = OrderedDict(
normalizer=split_path[-4],
selector=split_path[-3],
model=split_path[-2],
splitter=split_path[-1],
split_num=split_num,
y_train_true=train_y.values,
y_train_pred=train_pred,
y_test_true=test_y.values,
y_test_pred=test_pred,
train_metrics=train_metrics,
test_metrics=test_metrics,
train_indices=train_indices,
test_indices=test_indices,
train_groups=train_groups,
test_groups=test_groups,
)
if is_validation:
prediction_metrics_list = list()
for validation_column_name, validation_y, validation_predictions in zip(validation_column_names, validation_y_forpred_list, validation_predictions_list):
prediction_metrics = OrderedDict((name, function(validation_y, validation_predictions))
for name, (_, function) in metrics_dict.items())
if 'rmse_over_stdev' in prediction_metrics.keys():
# Correct series passed?
prediction_metrics['rmse_over_stdev'] = metrics_dict['rmse_over_stdev'][1](validation_y, validation_predictions, train_y)
prediction_metrics_list.append(prediction_metrics)
split_result['y_validation_true'+'_'+str(validation_column_name)] = validation_y.values
split_result['y_validation_pred'+'_'+str(validation_column_name)] = validation_predictions
split_result['prediction_metrics'] = prediction_metrics_list
else:
split_result['prediction_metrics'] = None
if is_classification:
split_result['y_train_pred_proba'] = train_pred_proba
split_result['y_test_pred_proba'] = test_pred_proba
log.info(" Making plots...")
if MiscSettings['plot_train_test_plots']:
plot_helper.make_train_test_plots(
split_result, path, is_classification,
label=y.name, model=model, train_X=train_X, test_X=test_X, groups=grouping_data)
if MiscSettings['plot_error_plots']:
if is_validation:
plot_helper.make_error_plots(split_result, path, is_classification,
label=y.name, model=model, train_X=train_X, test_X=test_X,
rf_error_method=MiscSettings['rf_error_method'],
rf_error_percentile=MiscSettings['rf_error_percentile'],
is_validation = is_validation,
validation_column_name = validation_column_name,
validation_X = validation_X_forpred,
groups=grouping_data)
else:
plot_helper.make_error_plots(split_result, path, is_classification,
label=y.name, model=model, train_X=train_X, test_X=test_X,
rf_error_method=MiscSettings['rf_error_method'],
rf_error_percentile=MiscSettings['rf_error_percentile'],
is_validation = is_validation,
validation_column_name = None,
validation_X= None,
groups=grouping_data)
# Write stats in each split path, not main path
if is_validation:
_write_stats_tocsv(split_result['train_metrics'],
split_result['test_metrics'],
path,
split_result['prediction_metrics'],
validation_column_names)
_write_stats(split_result['train_metrics'],
split_result['test_metrics'],
path,
split_result['prediction_metrics'],
validation_column_names)
else:
_write_stats_tocsv(split_result['train_metrics'],
split_result['test_metrics'],
path)
_write_stats(split_result['train_metrics'],
split_result['test_metrics'],
path)
return split_result
split_results = []
for split_num, (train_indices, test_indices) in enumerate(trains_tests):
path = join(main_path, f"split_{split_num}")
if 'EnsembleRegressor' in model.__class__.__name__:
models['EnsembleRegressor'].setup(path)
split_results.append(one_fit(split_num, train_indices, test_indices, normalizer_instance))
log.info(" Calculating mean and stdev of scores...")
def make_train_test_average_and_std_stats():
train_stats = OrderedDict([('Average Train', None)])
test_stats = OrderedDict([('Average Test', None)])
if is_validation:
prediction_stats = list()
num_predictions = len(split_results[0]['prediction_metrics'])
for i in range(num_predictions):
prediction_stats.append(OrderedDict([('Average Prediction', None)]))
for name in metrics_dict:
train_values = [split_result['train_metrics'][name] for split_result in split_results]
test_values = [split_result['test_metrics'][name] for split_result in split_results]
train_stats[name] = (np.mean(train_values), np.std(train_values))
test_stats[name] = (np.mean(test_values), np.std(test_values))
if is_validation:
for i in range(num_predictions):
prediction_values = [split_result['prediction_metrics'][i][name] for split_result in split_results]
prediction_stats[i][name] = (np.mean(prediction_values), np.std(prediction_values))
test_stats_single = dict()
test_stats_single[name] = (np.mean(test_values), np.std(test_values))
if grouping_data is not None:
groups = np.array(split_results[0]['test_groups'].tolist()+split_results[0]['train_groups'].tolist())
unique_groups = np.union1d(split_results[0]['test_groups'], split_results[0]['train_groups'])
plot_helper.plot_metric_vs_group(metric=name, groups=unique_groups, stats=test_values,
avg_stats = test_stats_single, savepath=join(main_path, str(name)+'_vs_group.png'))
plot_helper.plot_metric_vs_group_size(metric=name, groups=groups, stats=test_values,
avg_stats = test_stats_single, savepath=join(main_path, str(name)+'_vs_group_size.png'))
del train_stats['Average Train']
del test_stats['Average Test']
if is_validation:
for i in range(num_predictions):
del prediction_stats[i]['Average Prediction']
return train_stats, test_stats, prediction_stats
else:
return train_stats, test_stats
if is_validation:
avg_train_stats, avg_test_stats, avg_prediction_stats = make_train_test_average_and_std_stats()
# Here- write average stats to main folder of splitter
_write_stats_tocsv(avg_train_stats,
avg_test_stats,
main_path, prediction_metrics=avg_prediction_stats, prediction_names=validation_column_names)
_write_stats(avg_train_stats,
avg_test_stats,
main_path, prediction_metrics=avg_prediction_stats, prediction_names=validation_column_names)
else:
avg_train_stats, avg_test_stats = make_train_test_average_and_std_stats()
# Here- write average stats to main folder of splitter
_write_stats_tocsv(avg_train_stats,
avg_test_stats,
main_path)
_write_stats(avg_train_stats,
avg_test_stats,
main_path)
def make_average_error_plots(main_path):
has_model_errors = False
has_model_errors_validation = False
dfs_cumulative_errors = list()
dfs_cumulative_errors_validation = list()
for split_folder, _, __ in os.walk(main_path):
if "split" in split_folder:
path = join(main_path, split_folder)
try:
dfs_cumulative_errors.append(pd.read_csv(join(path,'test_cumulative_normalized_error.csv')))
if is_validation:
dfs_cumulative_errors_validation.append(pd.read_csv(join(path, 'validation_cumulative_normalized_error.csv')))
except:
pass
# Concatenate all dfs in list to one big df
df_cumulative_errors = pd.concat(dfs_cumulative_errors)
if is_validation:
df_cumulative_errors_validation = pd.concat(dfs_cumulative_errors_validation)
# Need to get average values of df columns by averagin over groups of Y True values (since each Y True should
# only appear once)
# TODO: change this to get values explicitly from each split and then average, as some Y True values may have same value and appear multiple times
df_normalized_errors_avgvalues = df_cumulative_errors.groupby('Y True').mean().reset_index()
y_true = np.array(df_normalized_errors_avgvalues['Y True'])
y_pred = np.array(df_normalized_errors_avgvalues['Y Pred'])
if is_validation:
df_normalized_errors_avgvalues_validation = df_cumulative_errors_validation.groupby('Y True').mean().reset_index()
y_true_validation = np.array(df_normalized_errors_avgvalues_validation['Y True'])
y_pred_validation = np.array(df_normalized_errors_avgvalues_validation['Y Pred'])
try:
average_error_values = np.array(df_normalized_errors_avgvalues['error_bars_down'])
has_model_errors = True
except:
average_error_values = None
has_model_errors = False
if is_validation:
try:
average_error_values_validation = np.array(df_normalized_errors_avgvalues_validation['error_bars_down'])
has_model_errors_validation = True
except:
average_error_values_validation = None
has_model_errors_validation = False
plot_helper.plot_average_cumulative_normalized_error(y_true=y_true, y_pred=y_pred,
savepath=join(main_path,'test_cumulative_normalized_error_average_allsplits.png'),
has_model_errors=has_model_errors,
err_avg=average_error_values)
# Here- plot predicted vs real errors for all splits, only if using RF, GBR, GPR, or ET
if model.__class__.__name__ in ['RandomForestRegressor', 'ExtraTreesRegressor', 'GradientBoostingRegressor', 'GaussianProcessRegressor', 'EnsembleRegressor']:
plot_helper.plot_real_vs_predicted_error(y_true, main_path, model, data_test_type='test')
if is_validation:
plot_helper.plot_average_cumulative_normalized_error(y_true=y_true_validation, y_pred=y_pred_validation,
savepath=join(main_path,
'validation_cumulative_normalized_error_average_allsplits.png'),
has_model_errors=has_model_errors_validation,
err_avg=average_error_values_validation)
# Here- plot predicted vs real errors for all splits
# Use y_true here because want to normalize to full training dataset stdev
if model.__class__.__name__ in ['RandomForestRegressor', 'ExtraTreesRegressor',
'GradientBoostingRegressor', 'GaussianProcessRegressor', 'EnsembleRegressor']:
plot_helper.plot_real_vs_predicted_error(y_true, main_path, model, data_test_type='validation')
plot_helper.plot_average_normalized_error(y_true=y_true, y_pred=y_pred,
savepath=join(main_path,'test_normalized_error_average_allsplits.png'),
has_model_errors=has_model_errors,
err_avg=average_error_values)
return
# Call to make average error plots
if conf['MiscSettings']['plot_error_plots']:
log.info(" Making average error plots over all splits")
if 'NoSplit' not in main_path:
make_average_error_plots(main_path=main_path)
log.info(" Making best/worst plots...")
def get_best_worst_median_runs():
# sort splits by the test score of first metric:
greater_is_better, _ = next(iter(metrics_dict.values())) # get first value pair
scalar = 1 if greater_is_better else -1
s = sorted(split_results, key=lambda run: scalar*next(iter(run['test_metrics'])))
return s[0], s[len(split_results)//2], s[-1]
worst, median, best = get_best_worst_median_runs()
def make_pred_vs_true_plots(model, y):
if conf['MiscSettings']['normalize_target_feature'] == True:
y = pd.Series(normalizer_instance.inverse_transform(y), name=conf['GeneralSetup']['input_target'])
if MiscSettings['plot_predicted_vs_true']:
plot_helper.plot_best_worst_split(y.values, best, worst,
join(main_path, 'best_worst_split'), label=conf['GeneralSetup']['input_target'])
predictions = [[] for _ in range(X.shape[0])]
for split_num, (train_indices, test_indices) in enumerate(trains_tests):
for i, pred in zip(test_indices, split_results[split_num]['y_test_pred']):
predictions[i].append(pred)
if MiscSettings['plot_predicted_vs_true_average']:
plot_helper.plot_predicted_vs_true_bars(
y.values, predictions, avg_test_stats,
join(main_path, 'predicted_vs_true_average'), label=conf['GeneralSetup']['input_target'])
if grouping_data is not None:
plot_helper.plot_predicted_vs_true_bars(
y.values, predictions, avg_test_stats,
join(main_path, 'predicted_vs_true_average_groupslabeled'),
label=conf['GeneralSetup']['input_target'],
groups=grouping_data)
if MiscSettings['plot_best_worst_per_point']:
plot_helper.plot_best_worst_per_point(y.values, predictions,
join(main_path, 'best_worst_per_point'),
metrics_dict, avg_test_stats, label=conf['GeneralSetup']['input_target'])
if not is_classification:
make_pred_vs_true_plots(model=model, y=y)
return split_results
runs = do_all_combos(X, y, df) # calls do_one_splitter internally
log.info("Making image html file...")
html_helper.make_html(outdir)
log.info("Making html file of all runs stats...")
_save_all_runs(runs, outdir)
# Here- do DLHub model hosting if have section
if bool(conf['ModelHosting']) != False: # dict is empty
model_hosting.host_model(model_path=conf['ModelHosting']['model_path'],
preprocessor_path=conf['ModelHosting']['preprocessor_path'],
training_data_path=conf['ModelHosting']['training_data_path'],
model_title=conf['ModelHosting']['model_title'],
model_name=conf['ModelHosting']['model_name'],
model_type="scikit-learn")
log.info('Finished uploading model to DLHub...')
log.info('Your MAST-ML run has finished successfully!')
return
def _instantiate(kwargs_dict, name_to_constructor, category, X_grouped=None, X_indices=None):
"""
Uses name_to_constructor to instantiate every item in kwargs_dict and return
the list of instantiations
"""
instantiations = []
for long_name, (name, kwargs) in kwargs_dict.items():
log.debug(f'instantiation: {long_name}, {name}({kwargs})')
try:
#skip instantiate step for keras model because need to pass dict to build model and not all values directly
if 'KerasRegressor' in long_name:
pass
# Need to construct cv object when have special case of RFECV and LeaveOneGroupOut cross-validation!
elif name == 'RFECV':
if 'cv' in kwargs.keys():
if X_grouped is not None:
if kwargs['cv'].__class__.__name__ == 'LeaveOneGroupOut':
trains = list()
tests = list()
for train_idx, test_idx in LeaveOneGroupOut().split(X=X_indices, y=None, groups=X_grouped):
trains.append(train_idx)
tests.append(test_idx)
custom_cv = zip(trains, tests)
kwargs['cv'] = custom_cv
instantiations.append([long_name, name_to_constructor[name](**kwargs)])
else:
instantiations.append([long_name, name_to_constructor[name](**kwargs)])
except TypeError:
log.info(f"ARGUMENTS FOR '{name}': {inspect.signature(name_to_constructor[name])}")
raise utils.InvalidConfParameters(
f"The {category} '{name}' has invalid parameters: {kwargs}\n"
f"Signature for '{name}': {inspect.signature(name_to_constructor[name])}")
except KeyError:
raise utils.InvalidConfSubSection(
f"There is no {category} called '{name}'."
f"All valid {category}: {list(name_to_constructor.keys())}")
return instantiations
def _grouping_column_to_group_number(X_grouped):
group_list = X_grouped.values.reshape((1, -1))
unique_groups = np.unique(group_list).tolist()
group_dict = dict()
group_list_asnumber = list()
for i, group in enumerate(unique_groups):
group_dict[group] = i+1
for i, group in enumerate(group_list.tolist()[0]):
group_list_asnumber.append(group_dict[group])
X_grouped_asnumber = np.asarray(group_list_asnumber)
return X_grouped_asnumber
def _snatch_models(models, conf_feature_selection):
models = OrderedDict(models)
log.debug(f'models, pre-snatching: \n{models}')
for selector_name, [_, args_dict] in conf_feature_selection.items():
if 'estimator' in args_dict:
model_name = args_dict['estimator']
try:
args_dict['estimator'] = models[model_name]
del models[model_name]
except KeyError:
raise utils.MastError(f"The selector {selector_name} specified model {model_name},"
f"which was not found in the [Models] section")
log.debug(f'models, post-snatching: \n{models}')
return models
def _snatch_keras_model(models, conf_models):
for model in conf_models.keys():
if 'KerasRegressor' in model:
keras_model = model_finder.KerasRegressor(conf_models[model][1])
models[model] = keras_model
return models
def _snatch_gpr_model(models, conf_models):
models = OrderedDict(models)
models_orig = deepcopy(models)
for model in models_orig.keys():
if 'GaussianProcessRegressor' in model or 'GaussianProcessClassifier' in model:
import sklearn.gaussian_process
from sklearn.gaussian_process import GaussianProcessRegressor, GaussianProcessClassifier
kernel_list = ['WhiteKernel', 'RBF', 'ConstantKernel', 'Matern', 'RationalQuadratic', 'ExpSineSquared', 'DotProduct']
kernel_operators = ['+', '*', '-']
params = conf_models[model]
kernel_string = params[1]['kernel']
# Need to delete old kernel (as str) from params so can use other specified params in new GPR model
del params[1]['kernel']
# Parse kernel_string to identify kernel types and any kernel operations to combine kernels
kernel_types_asstr = list()
kernel_types_ascls = list()
kernel_operators_used = list()
for s in kernel_string[:]:
if s in kernel_operators:
kernel_operators_used.append(s)
# Do case for single kernel, no operators
if len(kernel_operators_used) == 0:
kernel_types_asstr.append(kernel_string)
else:
# New method, using re
unique_operators = np.unique(kernel_operators_used).tolist()
unique_operators_asstr = '['
for i in unique_operators:
unique_operators_asstr += str(i)
unique_operators_asstr += ']'
kernel_types_asstr = re.split(unique_operators_asstr, kernel_string)
for kernel in kernel_types_asstr:
kernel_ = getattr(sklearn.gaussian_process.kernels, kernel)
kernel_types_ascls.append(kernel_())
# Case for single kernel
if len(kernel_types_ascls) == 1:
kernel = kernel_types_ascls[0]
kernel_count = 0
for i, operator in enumerate(kernel_operators_used):
if i+1 <= len(kernel_operators_used):
if operator == "+":
if kernel_count == 0:
kernel = kernel_types_ascls[kernel_count] + kernel_types_ascls[kernel_count+1]
else:
kernel += kernel_types_ascls[kernel_count+1]
elif operator == "*":
if kernel_count == 0:
kernel = kernel_types_ascls[kernel_count] * kernel_types_ascls[kernel_count+1]
else:
kernel *= kernel_types_ascls[kernel_count+1]
else:
logging.warning('You have chosen an invalid operator to construct a composite kernel. Please choose'
' either "+" or "*".')
kernel_count += 1
if 'GaussianProcessRegressor' in model:
gpr = GaussianProcessRegressor(kernel=kernel, **params[1])
# Need to delete old GPR from model list and replace with new GPR with correct kernel and other params.
del models[model]
models[model] = gpr
elif 'GaussianProcessClassifier' in model:
gpc = GaussianProcessClassifier(kernel=kernel, **params[1])
# Need to delete old GPC from model list and replace with new GPC with correct kernel and other params.
del models[model]
models[model] = gpc
return models
def _snatch_models_cv_for_hyperopt(conf, models, splitters, is_classification):
models = list(models.items())
if conf['HyperOpt']:
for searchtype, searchparams in conf['HyperOpt'].items():
for paramtype, paramvalue in searchparams[1].items():
if paramtype == 'estimator':
# Need to grab model and params from Model section of conf file
found_model = False
for model in models:
if model[0] == paramvalue:
conf['HyperOpt'][searchtype][1]['estimator'] = model[1]
found_model = True
break
if found_model == False:
raise utils.MastError(f"The estimator {paramvalue} could not be found in the input file!")
if paramtype == 'cv':
# Need to grab cv and params from DataSplits section of conf file
found_cv = False
for splitter in splitters:
if splitter[0] == paramvalue:
conf['HyperOpt'][searchtype][1]['cv'] = splitter[1]
found_cv = True
break
if found_cv == False:
raise utils.MastError(f"The cv object {paramvalue} could not be found in the input file!")
if paramtype == 'scoring':
# Need to grab correct scoring object
found_scorer = False
if is_classification:
metrics_dict = metrics.classification_metrics
else:
metrics_dict = metrics.regression_metrics
if paramvalue in metrics_dict.keys():
conf['HyperOpt'][searchtype][1]['scoring'] = make_scorer(metrics_dict[paramvalue][1],
greater_is_better=metrics_dict[paramvalue][0])
found_scorer = True
break
if found_scorer == False:
raise utils.MastError(
f"The scoring object {paramvalue} could not be found in the input file!")
return conf['HyperOpt']
def _snatch_splitters(splitters, conf_feature_selection):
log.debug(f'cv, pre-snatching: \n{splitters}')
for selector_name, (_, args_dict) in conf_feature_selection.items():
# Here: add snatch to cv object for feature selection with RFECV
if 'cv' in args_dict:
cv_name = args_dict['cv']
try:
args_dict['cv'] = splitters[cv_name]
del splitters[cv_name]
except KeyError:
raise utils.MastError(f"The selector {selector_name} specified cv splitter {cv_name},"
f"which was not found in the [DataSplits] section")
log.debug(f'cv, post-snatching: \n{splitters}')
def _extract_grouping_column_names(splitter_to_kwargs):
splitter_to_group_names = dict()
for splitter_name, name_and_kwargs in splitter_to_kwargs.items():
_, kwargs = name_and_kwargs
if 'grouping_column' in kwargs:
column_name = kwargs['grouping_column']
del kwargs['grouping_column'] # because the splitter doesn't actually take this
splitter_to_group_names[splitter_name] = column_name
return splitter_to_group_names
def _remove_constant_features(df):
log.info("Removing constant features, regardless of feature selectors.")
before = set(df.columns)
df = df.loc[:, (df != df.iloc[0]).any()]
removed = list(before - set(df.columns))
if removed != []:
log.warning(f'Removed {len(removed)}/{len(before)} constant columns.')
log.debug("Removed the following constant columns: " + str(removed))
return df
def _save_all_runs(runs, outdir):
"""
Produces a giant html table of all stats for all runs
"""
table = []
for run in runs:
od = OrderedDict()
for name, value in run.items():
if name == 'train_metrics':
for k, v in run['train_metrics'].items():
od['train_'+k] = v
elif name == 'test_metrics':
for k, v in run['test_metrics'].items():
od['test_'+k] = v
else:
od[name] = value
table.append(od)
pd.DataFrame(table).to_html(join(outdir, 'all_runs_table.html'))
def _write_stats(train_metrics, test_metrics, outdir, prediction_metrics=None, prediction_names=None):
with open(join(outdir, 'stats_summary.txt'), 'w') as f:
f.write("TRAIN:\n")
for name,score in train_metrics.items():
if type(score) == tuple:
f.write(f"{name}: {'%.3f'%float(score[0])} +/- {'%.3f'%float(score[1])}\n")
else:
f.write(f"{name}: {'%.3f'%float(score)}\n")
f.write("TEST:\n")
for name,score in test_metrics.items():
if type(score) == tuple:
f.write(f"{name}: {'%.3f'%float(score[0])} +/- {'%.3f'%float(score[1])}\n")
else:
f.write(f"{name}: {'%.3f'%float(score)}\n")
if prediction_metrics:
#prediction metrics now list of dicts for predicting multiple values
for prediction_metric, prediction_name in zip(prediction_metrics, prediction_names):
f.write("PREDICTION for "+str(prediction_name)+":\n")
for name, score in prediction_metric.items():
if type(score) == tuple:
f.write(f"{name}: {'%.3f'%float(score[0])} +/- {'%.3f'%float(score[1])}\n")
else:
f.write(f"{name}: {'%.3f'%float(score)}\n")
def _write_stats_tocsv(train_metrics, test_metrics, outdir, prediction_metrics=None, prediction_names=None):
datadict = dict()
for name, score in train_metrics.items():
if type(score) == tuple:
datadict[name+' train score'] = float(score[0])
datadict[name+' train stdev'] = float(score[1])
else:
datadict[name+' train score'] = float(score)
for name, score in test_metrics.items():
if type(score) == tuple:
datadict[name+' validation score'] = float(score[0])
datadict[name+' validation stdev'] = float(score[1])
else:
datadict[name+' validation score'] = float(score)
if prediction_names:
for prediction_metric, prediction_name in zip(prediction_metrics, prediction_names):
for name, score in prediction_metric.items():
if type(score) == tuple:
datadict[prediction_name+' '+name+' score'] = float(score[0])
datadict[prediction_name+' '+name+' stdev'] = float(score[1])
else:
datadict[prediction_name+' '+name+' score'] = float(score)
pd.DataFrame().from_dict(data=datadict, orient='index').to_csv(join(outdir, 'stats_summary.csv'))
return
def _exclude_validation(df, validation_column):
return df.loc[validation_column != 1]
def _only_validation(df, validation_column):
return df.loc[validation_column == 1]
[docs]def check_paths(conf_path, data_path, outdir):
"""
This method is responsible for error handling of the user-specified paths for the configuration file, data file,
and output directory.
Args:
conf_path: (str), the path supplied by the user which contains the input configuration file
data_path: (str), the path supplied by the user which contains the input data file (as CSV or XLSX)
outdir: (str), the path supplied by the user which determines where the output results are saved to
Returns:
conf_path: (str), the path supplied by the user which contains the input configuration file
data_path: (str), the path supplied by the user which contains the input data file (as CSV or XLSX)
outdir: (str), the path supplied by the user which determines where the output results are saved to
"""
# Check conf path:
if type(conf_path) is str:
if os.path.splitext(conf_path)[1] != '.conf':
raise utils.FiletypeError(f"Conf file does not end in .conf: '{conf_path}'")
if not os.path.isfile(conf_path):
raise utils.FileNotFoundError(f"No such file: {conf_path}")
elif type(conf_path) is dict:
pass
else:
raise TypeError('Your conf_path must be either a string to .conf file path or a dict')
# Check data path:
if type(data_path) is str:
if os.path.splitext(data_path)[1] not in ['.csv', '.xlsx']:
raise utils.FiletypeError(f"Data file does not end in .csv or .xlsx: '{data_path}'")
if not os.path.isfile(data_path):
raise utils.FileNotFoundError(f"No such file: {data_path}")
elif type(data_path) is type(pd.DataFrame()):
pass
else:
raise TypeError('Your data_path must be either a string to .csv or .xlsx data file or a pd.DataFrame object')
# Check output directory:
if os.path.exists(outdir):
try:
os.rmdir(outdir) # succeeds if empty
except OSError: # directory not empty
log.warning(f"{outdir} not empty. Renaming...")
now = datetime.now()
outdir = outdir.rstrip(os.sep) # remove trailing slash
outdir = f"{outdir}_{now.month:02d}_{now.day:02d}" \
f"_{now.hour:02d}_{now.minute:02d}_{now.second:02d}"
os.makedirs(outdir)
log.info(f"Saving to directory '{outdir}'")
return conf_path, data_path, outdir
[docs]def get_commandline_args():
"""
This method is responsible for parsing and checking the command-line execution of MAST-ML inputted by the user.
Args:
None
Returns:
(str), the path supplied by the user which contains the input configuration file
(str), the path supplied by the user which contains the input data file (as CSV or XLSX)
(str), the path supplied by the user which determines where the output results are saved to
verbosity: (int), the verbosity level of the MAST-ML log, which determines the amount of information writtent to the log.
"""
parser = argparse.ArgumentParser(description='MAterials Science Toolkit - Machine Learning')
parser.add_argument('conf_path', type=str, help='path to mastml .conf file')
parser.add_argument('data_path', type=str, help='path to csv or xlsx file')
parser.add_argument('-o', action="store", dest='outdir', default='results',
help='Folder path to save output files to. Defaults to results/')
# from https://stackoverflow.com/a/14763540
# we only use them to set a bool but it would be nice to have multiple levels in the future
parser.add_argument('-v', '--verbosity', action="count",
help="include this flag for more verbose output")
parser.add_argument('-q', '--quietness', action="count",
help="include this flag to hide [DEBUG] printouts, or twice to hide [INFO]")
args = parser.parse_args()
verbosity = (args.verbosity if args.verbosity else 0)\
- (args.quietness if args.quietness else 0)
# verbosity -= 1 ## uncomment this for distribution
return (os.path.abspath(args.conf_path),
os.path.abspath(args.data_path),
os.path.abspath(args.outdir),
verbosity)
if __name__ == '__main__':
conf_path, data_path, outdir, verbosity = get_commandline_args()
main(conf_path, data_path, outdir, verbosity)