"""
Module for constructing models for use in MAST-ML.
SklearnModel:
Class that wraps scikit-learn models to have MAST-ML type functionality. Providing the model name as a string
and the keyword arguments for the model parameters will construct the model. Note that this class also supports
construction of XGBoost models and Keras neural network models via Keras' keras.wrappers.scikit_learn.KerasRegressor
model.
EnsembleModel:
Class that constructs a model which is an ensemble of many base models (sometimes called weak learners). This
class supports construction of ensembles of most scikit-learn regression models as well as ensembles of neural
networks that are made via Keras' keras.wrappers.scikit_learn.KerasRegressor class.
CrabNetModel:
Class that provides an implementation of PyTorch-based CrabNet regressor model based on the following work:
Wang, A., Kauwe, S., Murdock, R., Sparks, T. "Compositionally restricted attention-based network for
" materials property predictions", npj Computational Materials (2021) (https://www.nature.com/articles/s41524-021-00545-1)
"""
import pandas as pd
import sklearn.base
import sklearn.utils
from sklearn.ensemble import BaggingRegressor
from sklearn.gaussian_process import GaussianProcessRegressor
import inspect
from pprint import pprint
import re
import subprocess
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
try:
import transfernet as trans
import torch
except:
print('transfernet is an optional dependency. If you want to use transfer learning pytorch NNs, do'
' "pip install transfernet"')
try:
import xgboost
except:
print('XGBoost is an optional dependency. If you want to use XGBoost models, please manually install xgboost package with '
'pip install xgboost. If have error with finding libxgboost.dylib library, do'
'brew install libomp. If do not have brew on your system, first do'
' ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" from the Terminal')
try:
from sklego.linear_model import LowessRegression
except:
print('scikit-lego is an optional dependency, enabling use of the LowessRegression model. If you want to use this model, '
'do "pip install scikit-lego"')
try:
from lineartree import LinearTreeRegressor, LinearForestRegressor, LinearBoostingRegressor
except:
print('linear-tree is an optional dependency, enabling use of Linear tree, forest, and boosting models. If you want'
' to use this model, do "pip install linear-tree"')
try:
from gplearn.genetic import SymbolicRegressor
except:
print('gplearn is an optional dependency, enabling the use of genetic programming SymbolicRegressor model. If you'
' want to use this model, do "pip install gplearn"')
try:
from kan import *
except:
print('pykan is an optional dependency, enabling use of Kolmogorov-Arnold Networks (KANs). If you want to use'
' this mode, do "pip install pykan". As of 9/27/24, need pykan==0.0.5')
[docs]
class SklearnModel(BaseEstimator, TransformerMixin):
"""
Class to wrap any sklearn estimator, and provide some new dataframe functionality
Args:
model: (str), string denoting the name of an sklearn estimator object, e.g. KernelRidge
kwargs: keyword pairs of values to include for model, e.g. for KernelRidge can specify kernel, alpha, gamma values
Methods:
fit: method that fits the model parameters to the provided training data
Args:
X: (pd.DataFrame), dataframe of X features
y: (pd.Series), series of y target data
Returns:
fitted model
predict: method that evaluates model on new data to give predictions
Args:
X: (pd.DataFrame), dataframe of X features
as_frame: (bool), whether to return data as pandas dataframe (else numpy array)
Returns:
series or array of predicted values
help: method to output key information on class use, e.g. methods and parameters
Args:
None
Returns:
None, but outputs help to screen
"""
def __init__(self, model, **kwargs):
if model == 'XGBoostRegressor':
self.model = xgboost.XGBRegressor(**kwargs)
elif model == 'GaussianProcessRegressor':
kernel = kwargs['kernel']
kernel = _make_gpr_kernel(kernel_string=kernel)
del kwargs['kernel']
self.model = GaussianProcessRegressor(kernel=kernel, **kwargs)
elif model == 'LowessRegression':
self.model = LowessRegression(**kwargs)
elif model == 'LinearTreeRegressor':
self.model = LinearTreeRegressor(**kwargs)
elif model == 'LinearForestRegressor':
self.model = LinearForestRegressor(**kwargs)
elif model == 'LinearBoostingRegressor':
self.model = LinearBoostingRegressor(**kwargs)
elif model == 'SymbolicRegressor':
self.model = SymbolicRegressor(**kwargs)
else:
self.model = dict(sklearn.utils.all_estimators())[model](**kwargs)
[docs]
def fit(self, X, y):
return self.model.fit(X, y)
[docs]
def predict(self, X, as_frame=True):
if as_frame == True:
return pd.DataFrame(self.model.predict(X), columns=['y_pred']).squeeze()
else:
return self.model.predict(X).ravel()
def predict_proba(self, X):
if hasattr(self.model, 'predict_proba'):
return self.model.predict_proba(X)
[docs]
def get_params(self, deep=True):
return self.model.get_params(deep)
[docs]
def help(self):
print('Documentation for', self.model)
pprint(dict(inspect.getmembers(self.model))['__doc__'])
print('\n')
print('Class methods for,', self.model)
pprint(dict(inspect.getmembers(self.model, predicate=inspect.ismethod)))
print('\n')
print('Class attributes for,', self.model)
pprint(self.model.__dict__)
return
[docs]
class KANModel(KAN):
'''
Implementation of Kolmogorov-Arnold Networks (KANs) from the following work:
Liu, Z., Wang, Y., Vaidya, S., Ruehle, F. Halverson, J., Soljacic, M. Hou, T. Y., Tegmark, M.
"KAN: Kolmogorov-Arnold Networks", arXiv (2024) (https://arxiv.org/abs/2404.19756)
Information on input parameters taken from pykan Github: https://github.com/KindXiaoming/pykan
Args:
width (list of int): list of integers specifying the network architecture. For regression problems, the first number is
equal to the number of input features, the last number is the output number of nodes (= 1), and the intermediate
numbers determine the number of nodes in hidden layers. Default is N - 2N+1 - 1 following the KA theorem, where
N = number of input features
grid (int): The number of grid intervals
k (int): The order of piecewise polynomial
steps (int): Number of KAN training steps. Similar to epochs for MLPs
seed (int): the input seed. Defaults to 0 so need to set new seed for each split to have random start
savepath (str): path to save the output model
opt (str): optimization method. Possibilities include "LBFGS", or "Adam"
lamb (float): overall penalty strength
lamb_entropy (float): entropy penalty strength
Methods:
fit: method that fits the model parameters to the provided training data
Args:
X: (pd.DataFrame), dataframe of X data used for model training
y: (pd.Series), series of y target data
Returns:
fitted model
predict: method that evaluates model on new data to give predictions
Args:
X: (pd.DataFrame), dataframe of X data used for model testing
as_frame: (bool), whether to return data as pandas dataframe (else numpy array)
Returns:
series or array of predicted values
'''
def __init__(self, width, grid=3, k=3, steps=20, seed=None, savepath=None, opt='LBFGS', lamb=0.01, lamb_entropy=10):
self.width = width
self.grid = grid
self.k = k
self.steps = steps
self.seed = seed
self.savepath = savepath
self.opt = opt
self.lamb = lamb
self.lamb_entropy = lamb_entropy
super(KANModel, self).__init__(width=self.width, grid=self.grid, k=self.k)
[docs]
def fit(self, X, y):
# Need to re-instantiate the model at each fit, otherwise data leakage will occur e.g., when doing 5fold CV
super(KANModel, self).__init__(width=self.width, grid=self.grid, k=self.k)
if self.seed is None:
# get new random seed
seed = np.random.randint(0, 1000000000, 1)[0]
self.seed = seed
if self.width is None:
# set default width
N = X.shape[1]
self.width = [N, 2*N+1, 1]
# Make data into torch tensors
import torch
dataset = {'train_input': torch.from_numpy(np.array(X)),
'test_input': torch.from_numpy(np.array(X)),
'train_label': torch.from_numpy(np.array(y).reshape(-1, 1)),
'test_label': torch.from_numpy(np.array(y).reshape(-1, 1))}
self(dataset['train_input'])
self.train(dataset, opt=self.opt, steps=self.steps, lamb=self.lamb, lamb_entropy=self.lamb_entropy)
return
[docs]
def predict(self, X, as_frame=True):
dataset = {'test_input': torch.from_numpy(np.array(X))}
preds = self(dataset['test_input']).detach().numpy().ravel()
if as_frame == True:
return pd.DataFrame(preds, columns=['y_pred']).squeeze()
else:
return preds
[docs]
class SourceNN:
def __init__(
self,
source_arch,
nn_params,
val_size=None,
test_size=None,
savepath='source_model',
):
self.source_arch = source_arch # NN Architecture
self.nn_params = nn_params # Fitting marameters like lr and batch size
self.val_size = val_size
self.test_size = test_size
self.savepath = savepath # The location to save results
[docs]
def fit(self, X_train, y_train):
# Split source domain into train and validation
if self.val_size is not None:
splits = train_test_split(
X_train,
y_train,
test_size=self.val_size,
)
X_train, X_val, y_train, y_val = splits
# Split source domain validation to get test set
if self.test_size is not None:
splits = train_test_split(
X_val,
y_val,
test_size=self.test_size,
)
X_val, X_test, y_val, y_test = splits
else:
X_test = y_test = None
else:
X_val = y_val = X_test = y_test = None
# Fit the NN
out = trans.utils.fit(
self.source_arch,
X_train,
y_train,
X_val=X_val,
y_val=y_val,
X_test=X_test,
y_test=y_test,
save_dir=self.savepath+'/source',
**self.nn_params,
)
self.source_model = out[1] # The fitted NN
self.source_model = model_wrapper(self.source_model)
[docs]
def predict(self, X):
return self.source_model.predict(X)
[docs]
class Transfer:
def __init__(
self,
prefit_path,
nn_params=None,
val_size=None,
test_size=None,
freeze_n_layers=None,
cover_model=None,
savepath='transfer_model',
):
self.nn_params = nn_params # Fitting marameters like lr and batch size
self.val_size = val_size
self.test_size = test_size
self.freeze_n_layers = freeze_n_layers
self.cover_model = cover_model
self.prefit_path = prefit_path
self.savepath = savepath # The location to save results
[docs]
def fit(self, X_train, y_train):
# Split source domain into train and validation
if self.val_size is not None:
splits = train_test_split(
X_train,
y_train,
test_size=self.val_size,
)
X_train, X_val, y_train, y_val = splits
# Split source domain validation to get test set
if self.test_size is not None:
splits = train_test_split(
X_val,
y_val,
test_size=self.test_size,
)
X_val, X_test, y_val, y_test = splits
else:
X_test = y_test = None
else:
X_val = y_val = X_test = y_test = None
# Chose defalut device
if torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load a prefit NN model
self.source_model = torch.load(
self.prefit_path,
map_location=torch.device(device),
)
# Use final hidden layer of NN as features
if self.cover_model is not None:
self.target_model = trans.models.AppendModel(
self.source_model,
self.cover_model,
)
self.target_model.fit(X_train, y_train)
if all([X_test is not None, y_test is not None]):
y_pred = self.target_model.predict(X_test)
df = pd.DataFrame()
df['y'] = y_test.ravel()
df['y_pred'] = y_pred
df['set'] = 'test'
name = self.savepath+'/transfer_by_appending_model'
os.makedirs(name, exist_ok=True)
trans.plots.parity(df, os.path.join(name, 'parity'))
# Alternatively freeze layers of NN
elif self.freeze_n_layers is not None:
name = '/transfer_by_freeze_{}_layers'.format(self.freeze_n_layers)
out = trans.utils.fit(
self.source_model, # Start from original model
X_train,
y_train,
X_val=X_val,
y_val=y_val,
X_test=X_test,
y_test=y_test,
save_dir=self.savepath+name,
freeze_n_layers=self.freeze_n_layers,
**self.nn_params,
)
self.target_model = out[1]
self.target_model = model_wrapper(self.target_model)
return self.target_model
[docs]
def predict(self, X):
return self.target_model.predict(X)
[docs]
class model_wrapper:
'''
Wrapper for pytorch model to include predict method
'''
def __init__(self, model):
self.model = model
[docs]
def predict(self, X):
# Chose defalut device
if torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
X = trans.utils.to_tensor(X, device)
pred = self.model(X) # If model object passed
pred = pred.cpu().detach().view(-1).numpy()
return pred
[docs]
class HostedModel():
def __init__(self, container_name):
self.container_name = container_name
[docs]
def predict(self, df):
df.to_csv('./test.csv', index=False)
command = 'udocker --allow-root run -v '
command += '{}:/mnt '.format(os.getcwd())
command += self.container_name
command += ' python3 predict.py'
subprocess.check_output(
command,
shell=True
)
df = pd.read_csv('./prediction.csv')
os.remove('./test.csv')
os.remove('./prediction.csv')
return df
[docs]
class EnsembleModel(BaseEstimator, TransformerMixin):
"""
Class used to construct ensemble models with a particular number and type of weak learner (base model). The
ensemble model is compatible with most scikit-learn regressor models and KerasRegressor models
Args:
model: (str), string name denoting the name of the model type to use as the base model
n_estimators: (int), the number of base models to include in the ensemble
kwargs: keyword arguments for the base model parameter names and values
Methods:
fit: method that fits the model parameters to the provided training data
Args:
X: (pd.DataFrame), dataframe of X features
y: (pd.Series), series of y target data
Returns:
fitted model
predict: method that evaluates model on new data to give predictions
Args:
X: (pd.DataFrame), dataframe of X features
as_frame: (bool), whether to return data as pandas dataframe (else numpy array)
Returns:
series or array of predicted values
get_params: method to output key model parameters
Args:
deep: (bool), determines the extent of information returned, default True
Returns:
information on model parameters
"""
def __init__(self, model, n_estimators, **kwargs):
super(EnsembleModel, self).__init__()
try:
if model == 'XGBoostRegressor':
model = xgboost.XGBRegressor(**kwargs)
elif model == 'GaussianProcessRegressor':
kernel = kwargs['kernel']
kernel = _make_gpr_kernel(kernel_string=kernel)
del kwargs['kernel']
model = GaussianProcessRegressor(kernel=kernel, **kwargs)
elif model == 'KANModel':
model = KANModel(**kwargs)
else:
model = dict(sklearn.utils.all_estimators())[model](**kwargs)
except:
print('Could not find designated model type in scikit-learn model library. Note the other supported model'
'type is the keras.wrappers.scikit_learn.KerasRegressor model')
self.n_estimators = n_estimators
self.model = BaggingRegressor(estimator=model, n_estimators=self.n_estimators)
self.base_estimator_ = model.__class__.__name__
[docs]
def fit(self, X, y):
return self.model.fit(X, y)
[docs]
def predict(self, X, as_frame=True):
if as_frame == True:
return pd.DataFrame(self.model.predict(X), columns=['y_pred']).squeeze()
else:
return self.model.predict(X).ravel()
[docs]
def get_params(self, deep=True):
return self.model.get_params(deep)
[docs]
class CrabNetModel(BaseEstimator, TransformerMixin):
'''
Implementation of PyTorch-based CrabNet regressor model based on the following work:
Wang, A., Kauwe, S., Murdock, R., Sparks, T. "Compositionally restricted attention-based network for
" materials property predictions", npj Computational Materials (2021) (https://www.nature.com/articles/s41524-021-00545-1)
The code to run CrabNet was integrated into MAST-ML based on source files available in this repository: https://github.com/anthony-wang/CrabNet
Args:
composition_column (str): string denoting the name of an input column containing materials compositions
epochs (int): number of training epochs
val_frac (float): fraction of input data to use as validation (0 to 1). Note this can be set to 0 for most MAST-ML
runs as the splitting is done outside of this model.
drop_unary (bool): whether or not to drop compositions containing one element. Should probably be False always. If true,
can cause some array length mismatches.
savepath (str): path to save the data to. This is set automatically in the data_splitters routine.
Methods:
get_model: method that prepares the CrabNet model. Called when the CrabNetModel class is instantiated.
fit: method that fits the model parameters to the provided training data
Args:
X: (pd.DataFrame), dataframe of X data, needs to contain at least a column of material compositions. Note
that featurization is done internally by the model.
y: (pd.Series), series of y target data
Returns:
fitted model
predict: method that evaluates model on new data to give predictions
Args:
X: (pd.DataFrame), dataframe of X data, needs to contain at least a column of material compositions. Note
that featurization is done internally by the model.
as_frame: (bool), whether to return data as pandas dataframe (else numpy array)
Returns:
series or array of predicted values
'''
def __init__(self, composition_column, epochs, val_frac, drop_unary=False, savepath=None):
super(CrabNetModel, self).__init__()
self.composition_column = composition_column
self.epochs = epochs
self.val_frac = val_frac
self.drop_unary = drop_unary
self.savepath = savepath
#self.get_model()
[docs]
def get_model(self):
from mastml.crabnet.model import Model
from mastml.crabnet.kingcrab import CrabNet
from mastml.utils.get_compute_device import get_compute_device
compute_device = get_compute_device(prefer_last=True)
model = Model(CrabNet(compute_device=compute_device).to(compute_device),
model_name='crabnet_model', verbose=True, drop_unary=self.drop_unary)
self.model = model
return
[docs]
def fit(self, X, y):
self.get_model()
# Crabnet needs to read a csv for data. The data needs compositions with "formula" as column name and targets with "target" as column name
train_data = os.path.join(self.savepath, 'train_crabnet.csv')
val_data = os.path.join(self.savepath, 'val_crabnet.csv')
all_data = pd.DataFrame({'formula': X[self.composition_column].values, 'target': y.values})
if self.val_frac > 0:
from sklearn.model_selection import train_test_split
trains, vals = train_test_split(all_data, test_size=self.val_frac)
else:
trains = all_data
trains.to_csv(train_data, index=False)
if self.val_frac > 0:
vals.to_csv(val_data, index=False)
data_size = trains.shape[0]
batch_size = 2 ** round(np.log2(data_size) - 4)
if batch_size < 2 ** 7:
batch_size = 2 ** 7
if batch_size > 2 ** 12:
batch_size = 2 ** 12
self.batch_size = batch_size
self.model.load_data(train_data, batch_size=self.batch_size, train=True)
print(f'training with batchsize {self.model.batch_size} '
f'(2**{np.log2(self.model.batch_size):0.3f})')
if self.val_frac > 0:
self.model.load_data(val_data, batch_size=batch_size)
# Set the number of epochs, decide if you want a loss curve to be plotted
self.model.fit(epochs=self.epochs, losscurve=False)
# Save the network (saved as f"{model_name}.pth")
self.model.save_network(path=self.savepath)
return
[docs]
def predict(self, X, as_frame=True):
test_data = os.path.join(self.savepath, 'test_crabnet.csv')
data = pd.DataFrame({'formula': X[self.composition_column].values, 'target': np.zeros(shape=X.shape[0])})
data.to_csv(test_data, index=False)
self.model.load_data(test_data, batch_size=self.batch_size, train=False)
output = self.model.predict(self.model.data_loader)
preds = output[1]
if as_frame == True:
return pd.DataFrame(preds, columns=['y_pred']).squeeze()
else:
return preds.ravel()
def _make_gpr_kernel(kernel_string):
"""
Method to transform a supplied string to a kernel object for use in GPR models
Args:
kernel_string: (str), a string containing the desired name of the kernel
Return:
kernel: sklearn.gaussian_process.kernels object
"""
kernel_list = ['WhiteKernel', 'RBF', 'ConstantKernel', 'Matern', 'RationalQuadratic', 'ExpSineSquared', 'DotProduct']
kernel_operators = ['+', '*', '-']
# Parse kernel_string to identify kernel types and any kernel operations to combine kernels
kernel_types_asstr = list()
kernel_types_ascls = list()
kernel_operators_used = list()
for s in kernel_string[:]:
if s in kernel_operators:
kernel_operators_used.append(s)
# Do case for single kernel, no operators
if len(kernel_operators_used) == 0:
kernel_types_asstr.append(kernel_string)
else:
# New method, using re
unique_operators = np.unique(kernel_operators_used).tolist()
unique_operators_asstr = '['
for i in unique_operators:
unique_operators_asstr += str(i)
unique_operators_asstr += ']'
kernel_types_asstr = re.split(unique_operators_asstr, kernel_string)
for kernel in kernel_types_asstr:
kernel_ = getattr(sklearn.gaussian_process.kernels, kernel)
kernel_types_ascls.append(kernel_())
# Case for single kernel
if len(kernel_types_ascls) == 1:
kernel = kernel_types_ascls[0]
kernel_count = 0
for i, operator in enumerate(kernel_operators_used):
if i+1 <= len(kernel_operators_used):
if operator == "+":
if kernel_count == 0:
kernel = kernel_types_ascls[kernel_count] + kernel_types_ascls[kernel_count+1]
else:
kernel += kernel_types_ascls[kernel_count+1]
elif operator == "*":
if kernel_count == 0:
kernel = kernel_types_ascls[kernel_count] * kernel_types_ascls[kernel_count+1]
else:
kernel *= kernel_types_ascls[kernel_count+1]
else:
print('Warning: You have chosen an invalid operator to construct a composite kernel. Please choose'
' either "+" or "*".')
kernel_count += 1
return kernel