Source code for mastml.legos.data_splitters

"""
The data_splitters module contains a collection of classes for generating (train_indices, test_indices) pairs from
a dataframe or a numpy array.

For more information and a list of scikit-learn splitter classes, see:
 http://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection
"""

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.neighbors import NearestNeighbors
import sklearn.model_selection as ms
try:
    from matminer.featurizers.composition import ElementFraction
except:
    print('Error with importing matminer, try re-installing and try again')
try:
    from pymatgen import Composition
except:
    print('Error with importing pymatgen, try re-installing and try again')

from math import ceil
import warnings
from sklearn.utils import check_random_state

[docs]class SplittersUnion(BaseEstimator, TransformerMixin): """ Class to take the union of two separate splitting routines, so that many splitting routines can be performed at once Args: splitters: (list), a list of scikit-learn splitter objects Methods: get_n_splits: method to calculate the number of splits to perform across all splitters Args: X: (numpy array), array of X features y: (numpy array), array of y data groups: (numpy array), array of group labels Returns: (int), number of total splits to be conducted split: method to perform split into train indices and test indices Args: X: (numpy array), array of X features y: (numpy array), array of y data groups: (numpy array), array of group labels Returns: (numpy array), array of train and test indices """ def __init__(self, splitters): self.splitters = splitters
[docs] def get_n_splits(self, X, y, groups=None): return sum(splitter.get_n_splits(X, y, groups) for splitter in self.splitters)
[docs] def split(self, X, y, groups=None): for splitter in self.splitters: yield from splitter.split(X, y, groups)
[docs]class NoSplit(BaseEstimator, TransformerMixin): """ Class to just train the model on the training data and test it on that same data. Sometimes referred to as a "Full fit" or a "Single fit", equivalent to just plotting y vs. x. Args: None (only object instance) Methods: get_n_splits: method to calculate the number of splits to perform Args: None Returns: (int), always 1 as only a single split is performed split: method to perform split into train indices and test indices Args: X: (numpy array), array of X features Returns: (numpy array), array of train and test indices (all data used as train and test for NoSplit) """ def __init__(self): pass
[docs] def get_n_splits(self, X=None, y=None, groups=None): return 1
[docs] def split(self, X, y, groups=None): indices = np.arange(X.shape[0]) return [[indices, indices]]
[docs]class JustEachGroup(BaseEstimator, TransformerMixin): """ Class to train the model on one group at a time and test it on the rest of the data This class wraps scikit-learn's LeavePGroupsOut with P set to n-1. More information is available at: http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.LeavePGroupsOut.html Args: None (only object instance) Methods: get_n_splits: method to calculate the number of splits to perform Args: groups: (numpy array), array of group labels Returns: (int), number of unique groups, indicating number of splits to perform split: method to perform split into train indices and test indices Args: X: (numpy array), array of X features y: (numpy array), array of y data groups: (numpy array), array of group labels Returns: (numpy array), array of train and test indices """ def __init__(self): pass
[docs] def get_n_splits(self, X=None, y=None, groups=None): return np.unique(groups).shape[0]
[docs] def split(self, X, y, groups): n_groups = self.get_n_splits(groups=groups) #print('n_groups', n_groups) lpgo = ms.LeavePGroupsOut(n_groups=n_groups-1) return lpgo.split(X, y, groups)
#class WithoutElement(BaseEstimator, TransformerMixin): # " Train the model without each element, then test on the rows with that element " # pass
[docs]class LeaveCloseCompositionsOut(ms.BaseCrossValidator): """ Leave-P-out where you exclude materials with compositions close to those the test set Computes the distance between the element fraction vectors. For example, the :math:`L_2` distance between Al and Cu is :math:`\sqrt{2}` and the :math:`L_1` distance between Al and Al0.9Cu0.1 is 0.2. Consequently, this splitter requires a list of compositions as the input to `split` rather than the features. Args: dist_threshold (float): Entries must be farther than this distance to be included in the training set nn_kwargs (dict): Keyword arguments for the scikit-learn NearestNeighbor class used to find nearest points """ def __init__(self, dist_threshold=0.1, nn_kwargs=None): super(LeaveCloseCompositionsOut, self).__init__() if nn_kwargs is None: nn_kwargs = {} self.dist_threshold = dist_threshold self.nn_kwargs = nn_kwargs
[docs] def split(self, X, y=None, groups=None): # Generate the composition vectors frac_computer = ElementFraction() elem_fracs = frac_computer.featurize_many(list(map(Composition, X)), pbar=False) # Generate the nearest-neighbor lookup tool neigh = NearestNeighbors(**self.nn_kwargs) neigh.fit(elem_fracs) # Generate a list of all entries all_inds = np.arange(0, len(X), 1) # Loop through each entry in X for i, x in enumerate(elem_fracs): # Get all the entries within the threshold distance of the test point too_close, = neigh.radius_neighbors([x], self.dist_threshold, return_distance=False) # Get the training set as "not these points" train_inds = np.setdiff1d(all_inds, too_close) yield train_inds, [i]
[docs] def get_n_splits(self, X=None, y=None, groups=None): return len(X)
[docs]class LeaveOutPercent(BaseEstimator, TransformerMixin): """ Class to train the model using a certain percentage of data as training data Args: percent_leave_out (float): fraction of data to use in training (must be > 0 and < 1) n_repeats (int): number of repeated splits to perform (must be >= 1) Methods: get_n_splits: method to return the number of splits to perform Args: groups: (numpy array), array of group labels Returns: (int), number of unique groups, indicating number of splits to perform split: method to perform split into train indices and test indices Args: X: (numpy array), array of X features y: (numpy array), array of y data groups: (numpy array), array of group labels Returns: (numpy array), array of train and test indices """ def __init__(self, percent_leave_out=0.2, n_repeats=5): self.percent_leave_out = percent_leave_out self.n_repeats = n_repeats
[docs] def get_n_splits(self, X=None, y=None, groups=None): return self.n_repeats
[docs] def split(self, X, y, groups=None): indices = range(X.shape[0]) split = list() for i in range(self.n_repeats): trains, tests = ms.train_test_split(indices, test_size=self.percent_leave_out, random_state=np.random.randint(1, 1000), shuffle=True) split.append((trains, tests)) return split
[docs]class Bootstrap(object): """ # Note: Bootstrap taken directly from sklearn Github (https://github.com/scikit-learn/scikit-learn/blob/0.11.X/sklearn/cross_validation.py) # which was necessary as it was later removed from more recent sklearn releases Random sampling with replacement cross-validation iterator Provides train/test indices to split data in train test sets while resampling the input n_bootstraps times: each time a new random split of the data is performed and then samples are drawn (with replacement) on each side of the split to build the training and test sets. Note: contrary to other cross-validation strategies, bootstrapping will allow some samples to occur several times in each splits. However a sample that occurs in the train split will never occur in the test split and vice-versa. If you want each sample to occur at most once you should probably use ShuffleSplit cross validation instead. Args: n : int Total number of elements in the dataset. n_bootstraps : int (default is 3) Number of bootstrapping iterations train_size : int or float (default is 0.5) If int, number of samples to include in the training split (should be smaller than the total number of samples passed in the dataset). If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. test_size : int or float or None (default is None) If int, number of samples to include in the training set (should be smaller than the total number of samples passed in the dataset). If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If None, n_test is set as the complement of n_train. random_state : int or RandomState Pseudo number generator state used for random sampling. """ # Static marker to be able to introspect the CV type indices = True def __init__(self, n, n_bootstraps=3, train_size=.5, test_size=None, n_train=None, n_test=None, random_state=0): self.n = n self.n_bootstraps = n_bootstraps if n_train is not None: train_size = n_train warnings.warn( "n_train is deprecated in 0.11 and scheduled for " "removal in 0.12, use train_size instead", DeprecationWarning, stacklevel=2) if n_test is not None: test_size = n_test warnings.warn( "n_test is deprecated in 0.11 and scheduled for " "removal in 0.12, use test_size instead", DeprecationWarning, stacklevel=2) if (isinstance(train_size, float) and train_size >= 0.0 and train_size <= 1.0): self.train_size = ceil(train_size * n) elif isinstance(train_size, int): self.train_size = train_size else: raise ValueError("Invalid value for train_size: %r" % train_size) if self.train_size > n: raise ValueError("train_size=%d should not be larger than n=%d" % (self.train_size, n)) if (isinstance(test_size, float) and test_size >= 0.0 and test_size <= 1.0): self.test_size = ceil(test_size * n) elif isinstance(test_size, int): self.test_size = test_size elif test_size is None: self.test_size = self.n - self.train_size else: raise ValueError("Invalid value for test_size: %r" % test_size) if self.test_size > n: raise ValueError("test_size=%d should not be larger than n=%d" % (self.test_size, n)) self.random_state = random_state def __iter__(self): rng = check_random_state(self.random_state) for i in range(self.n_bootstraps): # random partition permutation = rng.permutation(self.n) ind_train = permutation[:self.train_size] ind_test = permutation[self.train_size:self.train_size + self.test_size] # bootstrap in each split individually train = rng.randint(0, self.train_size, size=(self.train_size,)) test = rng.randint(0, self.test_size, size=(self.test_size,)) yield ind_train[train], ind_test[test] def __repr__(self): return ('%s(%d, n_bootstraps=%d, train_size=%d, test_size=%d, ' 'random_state=%d)' % ( self.__class__.__name__, self.n, self.n_bootstraps, self.train_size, self.test_size, self.random_state, )) def __len__(self): return self.n_bootstraps
[docs] def get_n_splits(self, X=None, y=None, groups=None): return self.__len__()
[docs] def split(self, X, y, groups=None): indices = range(X.shape[0]) split = list() for trains, tests in self: split.append((trains.tolist(), tests.tolist())) return split
name_to_constructor = { # sklearn splitters: 'Bootstrap': Bootstrap, 'GroupKFold': ms.GroupKFold, 'GroupShuffleSplit': ms.GroupShuffleSplit, 'KFold': ms.KFold, 'LeaveOneGroupOut': ms.LeaveOneGroupOut, 'LeavePGroupsOut': ms.LeavePGroupsOut, 'LeaveOneOut': ms.LeaveOneOut, 'LeavePOut': ms.LeavePOut, 'PredefinedSplit': ms.PredefinedSplit, 'RepeatedKFold': ms.RepeatedKFold, # NOTE: can use for repeated leave percent out / kfold 'RepeatedStratifiedKFold': ms.RepeatedStratifiedKFold, 'ShuffleSplit': ms.ShuffleSplit, # NOTE: like leave percent out 'StratifiedKFold': ms.StratifiedKFold, 'StratifiedShuffleSplit': ms.StratifiedShuffleSplit, 'TimeSeriesSplit': ms.TimeSeriesSplit, # mastml splitters 'NoSplit': NoSplit, 'JustEachGroup': JustEachGroup, 'LeaveCloseCompositionsOut': LeaveCloseCompositionsOut, 'LeaveOutPercent': LeaveOutPercent, #'WithoutElement': WithoutElement, }