Source code for mastml.domain

'''
This module contains a collection of routines to perform domain of applicability evaluations
'''
import os
import re
import itertools
import numpy as np
import pandas as pd
from pymatgen.core import Composition
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import ShuffleSplit
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel, WhiteKernel, Matern

from sklearn.model_selection import GridSearchCV
from sklearn.cluster import AgglomerativeClustering
from madml.splitters import BootstrappedLeaveClusterOut
from madml.models import dissimilarity
from madml.models import calibration
from madml.assess import nested_cv
from madml.models import combine

[docs] class Domain(): def __init__(self, check_type, preprocessor=None, model=None, params=None, path=None): self.check_type = check_type # The type of domain check self.params = params self.model = model self.preprocessor = preprocessor self.path = path # Convert a string to elements
[docs] def convert_string_to_elements(self, x): x = re.sub('[^a-zA-Z]+', '', x) x = Composition(x) x = x.chemical_system x = x.split('-') return x
# Check if all elements from a reference are the same as another case
[docs] def compare_elements(self, ref, x): # Check if any reference material in another observation condition = [True if i in ref else False for i in x] if all(condition): return 1 elif any(condition): return 0 else: return -1
[docs] def fit(self, X_train=None, y_train=None): # Data here is the chemical groups if self.check_type == 'elemental': chem_ref = X_train.apply(self.convert_string_to_elements) # Merge training cases to check if each test case is within self.chem_ref = set(itertools.chain.from_iterable(chem_ref)) # Data here is features and target variable elif self.check_type == 'gpr': self.pipe = Pipeline([ ('scaler', StandardScaler()), ('model', GaussianProcessRegressor(kernel=ConstantKernel()*Matern()+WhiteKernel(), n_restarts_optimizer=10)), ]) splitter = RepeatedKFold(n_repeats=1, n_splits=5) stds = [] for tr_index, te_index in splitter.split(X_train.values): self.pipe.fit( X_train.values[tr_index], y_train.values[tr_index] ) _, std = self.pipe.predict( X_train.values[te_index], return_std=True ) stds = np.concatenate((stds, std), axis=None) self.max_std = max(stds) # STD, ouch. Better get a check up # Train one more time with all training data self.pipe.fit( X_train.values, y_train.values ) elif self.check_type == 'feature_range': self.features = X_train.columns.tolist() self.feature_mins = dict() self.feature_maxes = dict() for f in self.features: self.feature_mins[f] = min(X_train[f]) self.feature_maxes[f] = max(X_train[f]) elif self.check_type == 'madml': # The machine learning pipeline pipe = Pipeline(steps=[ ('scaler', self.preprocessor.preprocessor), ('model', self.model.model), ]) # Need GridSearchCV object for compatability gs_model = GridSearchCV( pipe, {}, cv=ShuffleSplit(n_splits=1), ) ds_model = dissimilarity(dis='kde') if 'bandwidth' in self.params: ds_model.bandwidth= self.params['bandwidth'] if 'kernel' in self.params: ds_model.kernel = self.params['kernel'] # Uncertainty estimation model if 'uq_coeffs' in self.params: uq_model = calibration(params=self.params['uq_coeffs']) elif 'uq_function' in self.params: uq_model = calibration( uq_func=self.params['uq_function'], prior=True ) else: uq_model = calibration(params=[0.0, 1.0]) # UQ model # Types of sampling to test splits = [('fit', RepeatedKFold(n_repeats=self.params['n_repeats']))] # Boostrap, cluster data, and generate splits if 'n_clusters' in self.params: n_clusters = self.params['n_clusters'] else: n_clusters = [2, 3] for i in n_clusters: # Cluster Splits top_split = BootstrappedLeaveClusterOut( AgglomerativeClustering, n_repeats=self.params['n_repeats'], n_clusters=i ) splits.append(('agglo_{}'.format(i), top_split)) # Fit models model = combine(gs_model, ds_model, uq_model, splits) # Arguments may have different names in MADML implementation if 'bins' in self.params: model.bins = self.params['bins'] if 'gt_rmse' in self.params: model.gt_rmse = self.params['gt_rmse'] if 'gt_area' in self.params: model.gt_area = self.params['gt_area'] cv = nested_cv(model=model, X=X_train.values, y=y_train.values.ravel(), g=np.array(['None']*y_train.shape[0]), splitters=splits, ) _, __, self.madml_model = cv.test(save_outer_folds=os.path.join(self.path, 'madml_domain'))
[docs] def predict(self, X_test, *args, **kwargs): domains = dict() if self.check_type == 'elemental': chem_test = X_test.apply(self.convert_string_to_elements) domain_vals = [] for i in chem_test: d = self.compare_elements(self.chem_ref, i) domain_vals.append(d) domains['domain_elemental'] = domain_vals elif self.check_type == 'gpr': _, std = self.pipe.predict(X_test.values, return_std=True) domain_vals = std <= self.max_std domain_vals = [1 if i == True else -1 for i in domain_vals] domains['domain_gpr'] = domain_vals elif self.check_type == 'feature_range': features_inside_training_range = list() features_outside_training_full = list() for i, x in X_test.iterrows(): num_OK = len(self.features) features_outside_training = '' for f in self.features: is_OK = True if x[f] < self.feature_mins[f]: is_OK = False # features_outside_training.append(f) features_outside_training += str(f) + ', ' if x[f] > self.feature_maxes[f]: is_OK = False # features_outside_training.append(f) features_outside_training += str(f) + ', ' if is_OK == False: num_OK -= 1 # if is_OK == True: # features_outside_training.append('n/a') features_inside_training_range.append(num_OK) features_outside_training_full.append(features_outside_training) domains['domain_feature_range_numfeatsinside'] = features_inside_training_range domains['domain_feature_range_featsoutside'] = features_outside_training_full elif self.check_type == 'madml': if "madml_thresholds" in kwargs.keys(): t = kwargs['madml_thresholds'] th = [] for i in t: i = ['dist', i[0], i[1]] if i[1] == 'residual': i[1] = 'id' elif i[1] == 'uncertainty': i[1] = 'id_bin' th.append(i) domains = self.madml_model.predict(X_test, th) else: domains = self.madml_model.predict(X_test) return domains domains = pd.DataFrame(domains) return domains