Skip to content
Snippets Groups Projects

Various tutorials

Closed Luigi Sbailo requested to merge various_tutorials into master
Compare and
53 files
+ 4484
58
Compare changes
  • Side-by-side
  • Inline
Files
53
"""A Regressors to find the best descriptors using SISSO
"""
import math
from itertools import combinations, islice, product
import numpy as np
from time import time
from sisso.descriptor_identifcation.model import Model
from sisso.utils.mpi_interface import allgather_object, get_mpi_start_end_from_list
class SISSO_Regressor(object):
"""A simple implementation of the SISSO algorithm (R. Ouyang, S. Curtarolo,
E. Ahmetcik et al., Phys. Rev. Mater.2, 083802 (2018)) for regression. SISSO is an iterative
approach where at each iteration first SIS (Sure Independence Sreening) and SO
(Sparsifying Operator, here l0-regularization) is applied.
Note that it becomes the orthogonal matching pursuit for n_features_per_sis_iter=1.
A more efficient fortran implementation can be found on https://github.com/rouyang2017.
Adapted from work by E. Ahmeticik for SISSO workshops
Attributes:
all_l0_combinations (bool): If True, in the l0 step all combinations out sis_collected features will be checked.
If False, combinations of features of the same SIS iterations will be neglected.
feature_set (FeatureSpace): The full feature space for the regression
models (list of Model): List of the selected models
list_of_coefs (list of lists of floats): List of (Sparse) coefficient vector of linear model
list_of_intercepts (list of floats): Intercept/bias of linear model.
n_nonzero_coefs (int): Number of nonzero coefficients/ max. number of dimension of descriptor.
rmses (list of floats): List of RMSEs for all descriptor dimensions
"""
def __init__(
self,
feature_set,
n_nonzero_coefs=1,
all_l0_combinations=True,
mat_inds="all",
fix_c_0=False,
n_res_save=1,
learn_type="correlation",
):
"""Initializer
Args:
feature_set (FeatureSpace): The full feature space for the regression
n_nonzero_coefs (int): Number of nonzero coefficients/ max. number of dimension of descriptor.
all_l0_combinations (bool): If True, in the l0 step all combinations out sis_collected features will be checked.
If False, combinations of features of the same SIS iterations will be neglected.
"""
if len(feature_set.phi) < n_nonzero_coefs * feature_set.n_sis_select:
raise ValueError(
"Number of features less than total number of features selected by SIS"
)
if isinstance(mat_inds, str) and mat_inds == "all":
self.mat_inds = range(len(feature_set.prop))
else:
self.mat_inds = mat_inds
self.feature_set = feature_set
self.n_nonzero_coefs = n_nonzero_coefs
self.all_l0_combinations = all_l0_combinations
self.fix_c_0 = fix_c_0
self.learn_type = learn_type
self.models = []
self.n_res_save = n_res_save
def fit(self):
"""Fits all models using the available data
"""
# Initialize residual and p_centered
residuals = None
prop = (self.feature_set.prop[self.mat_inds]).flatten()
for i_iter in range(self.n_nonzero_coefs):
# Get reduced dataset with SIS
t0 = time()
self.feature_set.sis(residuals, self.mat_inds, self.learn_type)
# print(f"SIS: {time()-t0}")
t0 = time()
D = self.feature_set.df.values[self.mat_inds, :]
# SA step or L0 step, only if i_iter > 0
if i_iter == 0:
# coefs_standardized = best_feature/p_centered.size
sel_inds = self.feature_set.selected_inds[: self.n_res_save]
sel_inds = np.array(sel_inds).reshape(-1, 1)
coefs = np.zeros((self.n_res_save, 2))
sq_error = np.zeros((self.n_res_save, 1))
for ii, ind in enumerate(sel_inds):
coefs[ii], sq_error[ii] = self.least_sq_fit(
D[:, ii], prop, self.fix_c_0
)
else:
# perform L0 regularization
sel_inds, coefs, sq_error = self._l0_regularization(D, prop)
# print(sel_inds, coefs, sq_error)
sel_inds = np.array(self.feature_set.selected_inds)[sel_inds]
# print(f"fit: {time()-t0}")
t0 = time()
# process and save model outcomes
self.models.append([])
for inds in sel_inds:
# Copy the selected features into node list
sel_nodes = [node.copy() for node in self.feature_set.phi[inds]]
self.models[-1].append(
Model(sel_nodes, prop, self.mat_inds, self.fix_c_0)
)
# Get residual
residuals = [
self.feature_set.prop.flatten()[self.mat_inds]
- model.predict(self.mat_inds)
for model in self.models[-1]
]
# print(f"reidual and model creation: {time()-t0}")
def least_sq_fit(self, D_ls, P, fix_c_0=False):
"""Perform least squares fitting on a Descriptor matrix and a property array
Args:
D_ls (np.ndarray): Descriptor matrix
P (np.ndarray): Property matrix
fix_c_0 (bool): If True fix intercept at 0
Returns:
np.ndarray(float): Coefficients of the model
float: intercept of the model
"""
if not fix_c_0:
D_ls = np.column_stack((D_ls, np.ones(D_ls.shape[0])))
return np.linalg.lstsq(D_ls, P, rcond=-1)[:2]
else:
D_ls = D_ls.reshape(len(P), -1)
coefs, sq_error = np.linalg.lstsq(D_ls, P, rcond=-1)[:2]
return np.append(coefs, [0.0]), sq_error
def _l0_regularization(self, D, P):
"""Performs an L0 normalization on Dataset D for property P
Args:
D (np.ndarray): Descriptor Matrix
P (np.ndarray): Property of interest
Returns:
coefs_min(list of floats): coefficients for the model
full_inds(list of inds): indicies of the chosen descriptor
rmse (float): RMSE of the model
"""
coefs_min, indices_combi_min = None, None
inds = []
n_desc = len(self.feature_set.list_sel_inds)
for ii in range(n_desc):
start_range = ii * len(self.feature_set.list_sel_inds[ii])
end_range = (ii + 1) * len(self.feature_set.list_sel_inds[ii])
inds.append(list(range(start_range, end_range)))
if self.all_l0_combinations:
combinations_generator = combinations(np.concatenate(inds), n_desc)
f = math.factorial
n = len(np.concatenate(inds))
k = n_desc
len_gen = int(round(f(n) / (f(k) * f(n - k))))
else:
combinations_generator = product(*inds)
len_gen = len(inds) * len(inds[0])
start, end = get_mpi_start_end_from_list(len_gen)
error_list = np.ones(end - start) * np.inner(P, P)
coefs = np.zeros((error_list.shape[0], n_desc + 1))
indices = np.zeros((error_list.shape[0], n_desc), dtype=np.int64)
for cc, indices_combi in enumerate(islice(combinations_generator, start, end)):
D_ls = D[:, indices_combi]
indices[cc, :] = indices_combi
if not self.learn_type == "classification"
coefs[cc, :], square_error = self.least_sq_fit(D_ls, P, self.fix_c_0)
try:
if self.learn_type == "log":
error_list[cc] = np.sum(
np.log(np.abs(coefs[cc, :-1] @ D_ls.T + coefs[cc, -1] - P))
)
else:
error_list[cc] = square_error[0]
except IndexError:
continue
else:
pass
inds = error_list.argsort()[: self.n_res_save]
results = allgather_object([coefs[inds], indices[inds], error_list[inds]])
coefs_min = np.vstack([res[0] for res in results])
indices_combi_min = np.vstack([res[1] for res in results])
sq_error = np.hstack(np.array([res[2] for res in results]))
min_inds = np.argsort(sq_error)[: self.n_res_save]
return indices_combi_min[min_inds], coefs_min[min_inds], sq_error[min_inds]
def predict(self, dim=None):
"""Makes a prediction for a given model
Args:
dim (int, optional): Dimension of the model to use
Returns:
np.ndarray: The predicted values from the chosen model
"""
if dim is None:
dim = self.n_nonzero_coefs
return self.models[dim].predict()
def reset(self, mat_inds="all"):
"""Reinitialize the Regressor initial state
Args:
mat_inds (list of ints): A new list of material indexes to train over
"""
if isinstance(mat_inds, str) and mat_inds == "all":
self.mat_inds = range(len(self.feature_set.prop))
else:
self.mat_inds = mat_inds
# Reset all selection criteria to initial state
self.models = []
self.feature_set.clear_selection()
self.feature_set.pramaeterize(mat_inds)
def print_models(models, predict_err=None, print_model_str=True, coefs_prec=12):
"""Prints all models
Args:
models (list of Models): list of all models
predict_err (np.ndarray(float)): Prediction error for each model
print_model_str (bool): If True print the model strings
"""
if predict_err:
string = "\n%12s %22s %9s" % ("RMSE", "Predict Err", "Model")
else:
string = "\n%12s %16s" % ("RMSE", "Model")
# string += "\n".join( [self._get_model_string(i_iter) for i_iter in range(self.n_nonzero_coefs)] )
for dimension, model in enumerate(models):
if predict_err:
string += "\n%sD:\t%8f\t%8f\n" % (
dimension + 1,
model[0].rmse,
predict_err[dimension],
)
else:
string += "\n%sD:\t%8f\n" % (dimension + 1, model[0].rmse)
if print_model_str:
string += str(model[0]) + "\n"
print(string)
Loading