Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 26-refactoring-the-infrastructure
  • MPGCQM_workshop
  • app
  • backup-05-08
  • develop
  • kappa_sigma_notebook
  • master
  • staging
8 results

Target

Select target project
  • nomad-lab/analytics
1 result
Select Git revision
  • 26-refactoring-the-infrastructure
  • MPGCQM_workshop
  • app
  • backup-05-08
  • develop
  • kappa_sigma_notebook
  • master
  • staging
8 results
Show changes
Showing
with 2716 additions and 0 deletions
"""Defines the FeatureSpace class
Attributes:
comm (mpi4py.MPI.COMM_WORLD): MPI communicator
mpi_size (int): Number of MPI ranks in comm
my_rank (int): Rank of this process
"""
import inspect
import math
import sys
from itertools import chain, combinations, islice, product
from pathlib import Path
import numpy as np
import pandas as pd
from scipy import stats
from time import time
from sisso.feature_creation.nodes import operator_nodes as operator_nodes
from sisso.feature_creation.nodes.feature_node import FeatureNode
from sisso.utils.mpi_interface import allgather_object, get_mpi_start_end_from_list
class FeatureSpace:
"""Class to describe the feature space at an arbitrary order
Attributes:
allowed_ops (List of Node classes): Allowed operations in the feature space
list_sel_inds (list of lists of ints): List of selected indicies for each sis call
max_abs_feat_val (float): Maximum absolute value of the features
max_phi (int): Maximum depth of the operator trees
n_sis_select (int): Number of features to select on each SIS step
phi (list of lists of nodes): The full feature space
phi_0 (list of Nodes): The initial feature space
prop (np.ndarray): The property to be learned
row_index (list of str): keys for the index describing each material
selected_inds (list of ints): The indicies selected by SIS in a single list
"""
def __init__(
self,
phi_0,
prop,
allowed_ops="all",
max_phi=1,
n_sis_select=1,
parameterize=True,
max_abs_feat_val=1e27,
row_index=None,
fix_c_0=False,
cross_corr_max=0.95,
learn_type="correlation",
class_width=1e-6,
):
"""Initializer
Args:
phi_0 (list of Nodes): The initial feature space
prop (np.ndarray, optional): The property to be learned
allowed_ops (List of Node classes, optional): Allowed operations in the feature space
max_phi (int, optional): Maximum depth of the operator trees
n_sis_select (int, optional): Number of features to select on each SIS step
max_abs_feat_val (float, optional): Maximum absolute value of the features
row_index (list of str, optional): keys for the index describing each material
"""
self.row_index = row_index
self.fix_c_0 = fix_c_0
phi_0_not_nan = [not feat.isnan(max_abs_feat_val) for feat in phi_0]
phi_0 = list(np.array(phi_0)[np.where(phi_0_not_nan)[0]])
self.phi_0 = phi_0
self.phi_param = []
self.phi_non_param = phi_0.copy()
self.phi_param_gen_end = [0]
self.phi_non_param_gen_end = [len(phi_0)]
self.prop = prop
self.prop_centered = (prop - prop.mean()).flatten()
self.n_mats = len(prop)
self._parameterize = parameterize
self.cross_corr_max = cross_corr_max
self.learn_type = learn_type
self.class_width = class_width
if allowed_ops == "all":
clsmembers = inspect.getmembers(
sys.modules["sisso.feature_creation.nodes.operator_nodes"],
inspect.isclass,
)
self.allowed_ops = [member[1] for member in clsmembers]
elif isinstance(allowed_ops[0], str):
self.allowed_ops = [operator_nodes.op_map[op] for op in allowed_ops]
else:
self.allowed_ops = allowed_ops
self.max_phi = max_phi
self.max_abs_feat_val = max_abs_feat_val
self.selected_inds = []
self.list_sel_inds = []
self.n_sis_select = n_sis_select
self.generate_feature_space()
@classmethod
def from_df(
cls,
df,
prop_key=None,
allowed_ops="all",
cols="all",
max_phi=1,
n_sis_select=1,
max_abs_feat_val=1e27,
parameterize=True,
fix_c_0=False,
cross_corr_max=0.95,
learn_type="correlation",
class_width=1e-6,
):
"""Construct a FeatureSpace from a DataFrame
Args:
df (pandas.DataFrame): DataFrame describing the initial FeatureSpace (features as columns)
prop_key (None, optional): Key used to pull the property column
allowed_ops (List of Node classes, optional): Allowed operations in the feature space
cols (list of str, optional): Columns to pull from the DataFrame to add as the initial features
max_phi (int, optional): Maximum depth of the operator trees
n_sis_select (int, optional): Number of features to select on each SIS step
max_abs_feat_val (float, optional): Maximum value of the features
"""
if isinstance(df, str) or isinstance(df, Path):
df = pd.read_csv(df, index_col=0)
if prop_key is not None:
prop = df[[prop_key]].to_numpy()
else:
prop = None
df = df.drop([prop_key], axis=1)
row_index = df.index
if cols != "all":
for col in df.columns.tolist():
if col not in cols:
df = df.drop([col], axis=1)
return cls(
get_phi_0_from_df(df),
prop,
allowed_ops,
max_phi,
n_sis_select,
parameterize,
max_abs_feat_val,
row_index,
fix_c_0,
cross_corr_max,
learn_type,
class_width,
)
def set_value_upto_rung(self, rung=2):
"""Set values of features up a certain rung"""
if rung > self.max_phi:
rung = self.max_phi
start_p, end_p = get_mpi_start_end_from_list(self.phi_param_gen_end[rung])
start_np, end_np = self.phi_non_param_gen_end[0] + np.array(
get_mpi_start_end_from_list(
self.phi_non_param_gen_end[rung] - self.phi_non_param_gen_end[0]
)
)
for ii in range(start_p, end_p):
self.phi_param[ii].set_value()
self.phi_param[ii].set_fxn_in_value()
for ii in range(start_np, end_np):
self.phi_non_param[ii].set_value()
self.phi_non_param[ii].set_fxn_in_value()
self.phi_param = list(
chain.from_iterable(allgather_object(self.phi_param[start_p:end_p]))
)
self.phi_non_param[self.phi_non_param_gen_end[0] :] = list(
chain.from_iterable(allgather_object(self.phi_non_param[start_np:end_np]))
)
def pramaeterize(self, mat_inds):
"""Parameterize all parameterizable nodes
Args:
mat_inds (list of ints): Material indexes to include in the parameterization
"""
if not self._parameterize:
self.set_value_upto_rung()
return
start, end = get_mpi_start_end_from_list(len(self.phi_param))
for feat in self.phi_param[start:end]:
try:
feat.parameterize(self.prop, mat_inds, self.fix_c_0)
except ValueError:
feat.set_default_params()
self.phi_param = list(
chain.from_iterable(allgather_object(self.phi_param[start:end]))
)
self.set_value_upto_rung()
def generate_feature_space(self):
"""Generate the feature space from self.allowed_ops and self.phi_0
"""
# Separate allowed_ops into unary and binary operators
un_ops = []
bi_ops = []
com_bi_ops = []
un_param_ops = []
com_bi_param_ops = []
for op in self.allowed_ops:
if op not in operator_nodes.binary_ops and op in operator_nodes.param_ops:
un_param_ops.append(op)
elif op not in operator_nodes.binary_ops:
un_ops.append(op)
elif op not in operator_nodes.commutative_ops:
bi_ops.append(op)
elif op in operator_nodes.param_ops:
com_bi_param_ops.append(op)
else:
com_bi_ops.append(op)
# Generate New feature
phi = self.phi_0.copy()
rung_start = [0]
for dim_lev in range(1, self.max_phi + 1):
# Get initial start/end for current mpi rank
start, end = get_mpi_start_end_from_list(
len(phi) - rung_start[-1], rung_start[-1]
)
# Keep parameterizable separate
next_phi_param = []
for op in un_param_ops:
for feat in phi[start:end]:
try:
next_phi_param.append(op(feat))
except ValueError:
continue
next_phi = []
for op in un_ops:
for feat in phi[start:end]:
try:
next_phi.append(op(feat))
except ValueError:
continue
phi_n_mn_1_generator = combinations(phi[rung_start[-1] :], 2)
try:
f = math.factorial
n = len(phi) - rung_start[-1]
len_gen = int(round(f(n) / (f(2) * f(n - 2))))
except ValueError:
continue
start, end = get_mpi_start_end_from_list(len_gen)
for feat_combi in islice(phi_n_mn_1_generator, start, end):
for op in com_bi_ops:
try:
next_phi.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
pass
for op in com_bi_param_ops:
try:
next_phi_param.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
pass
for op in bi_ops:
try:
next_phi_param.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
pass
try:
next_phi_param.append(op(feat_combi[1], feat_combi[0]))
except ValueError:
pass
if len(rung_start) > 1:
phi_lower_generator = product(
phi[rung_start[-1] :], phi[0 : rung_start[-1]]
)
len_gen = (len(phi) - rung_start[-1]) * (
rung_start[-1] - rung_start[-2]
)
start, end = get_mpi_start_end_from_list(len_gen)
for feat_combi in islice(phi_lower_generator, start, end):
for op in com_bi_ops:
try:
next_phi.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
continue
for op in com_bi_param_ops:
try:
next_phi_param.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
continue
for op in bi_ops:
try:
next_phi_param.append(op(feat_combi[0], feat_combi[1]))
except ValueError:
continue
try:
next_phi_param.append(op(feat_combi[1], feat_combi[0]))
except ValueError:
continue
not_nan = [not feat.isnan(self.max_abs_feat_val) for feat in next_phi]
next_phi = np.array(next_phi)[np.where(not_nan)[0]]
next_phi = list(chain.from_iterable(allgather_object(next_phi)))
not_nan = [not feat.isnan(self.max_abs_feat_val) for feat in next_phi_param]
next_phi_param = np.array(next_phi_param)[np.where(not_nan)[0]]
next_phi_param = list(chain.from_iterable(allgather_object(next_phi_param)))
rung_start.append(len(phi))
phi += next_phi + next_phi_param
self.phi_non_param += next_phi
self.phi_param += next_phi_param
self.phi_param_gen_end.append(len(self.phi_param))
self.phi_non_param_gen_end.append(len(self.phi_non_param))
self.pramaeterize(list(range(len(self.prop))))
def get_selected_df(self, row_index=None, selected_inds=None):
"""Get a DataFame of selected features from phi
Args:
row_index (list of keys, optional): How to label the materials
selected_inds (list of int, optional): The indices of the features to add to the DataFrame
Returns:
TYPE: Description
"""
if row_index is None:
row_index = self.row_index
selected_nodes = self.get_select_nodes(selected_inds)
col_headers = [f"{node.expr} ({node.unit})" for node in selected_nodes]
data = np.array([node.fxn_in_value for node in selected_nodes])
if row_index is None:
return pd.DataFrame(data=data.T, columns=col_headers)
return pd.DataFrame(data=data.T, columns=col_headers, index=row_index)
@property
def df(self):
"""Return a DataFrame containing only nodes selected by the SIS steps
Returns:
DaraFrame: DataFrame containing the selected nodes
"""
selected_nodes = self.sis_selected
col_headers = [f"{node.expr} ({node.unit})" for node in selected_nodes]
data = np.array([node.fxn_in_value for node in selected_nodes])
if self.row_index is None:
return pd.DataFrame(data=data.T, columns=col_headers)
return pd.DataFrame(data=data.T, columns=col_headers, index=self.row_index)
@property
def all_df(self):
"""Return a DataFrame containing only nodes selected by the SIS steps
Returns:
DaraFrame: DataFrame containing the selected nodes
"""
selected_nodes = self.phi
col_headers = [f"{node.expr} ({node.unit})" for node in selected_nodes]
data = np.array([node.fxn_in_value for node in selected_nodes])
if self.row_index is None:
return pd.DataFrame(data=data.T, columns=col_headers)
return pd.DataFrame(data=data.T, columns=col_headers, index=self.row_index)
def project_stand(self, prop, inds, mat_inds, start=0, end=-1):
prop_centered = [(pp - pp.mean()) / pp.std() for pp in prop]
feat_vals = np.array(
[feat.standardized_value[mat_inds] for feat in self.phi[inds[start:end]]]
)
return np.max(np.abs(np.dot(prop_centered, feat_vals.T)), axis=0)
def project_r(self, prop, inds, mat_inds, start=0, end=-1):
feat_vals = np.array(
[feat.value[mat_inds] for feat in self.phi[inds[start:end]]]
)
return np.max(
np.abs(
[
(np.dot(pp, ff) - len(pp) * pp.mean() * ff.mean())
/ (len(pp) * pp.std() * ff.std())
for pp in prop
for ff in feat_vals
]
).reshape(len(prop), -1),
axis=0,
)
def project_log(self, prop, inds, mat_inds, start=0, end=-1):
feat_vals = np.array(
[feat.value[mat_inds] for feat in self.phi[inds[start:end]]]
)
coef_ints = np.array(
[stats.linregress(fv, pp[mat_inds])[:2] for pp in prop for fv in feat_vals]
).reshape((len(prop), len(feat_vals), -1))
return np.max(
np.abs(
[
np.sum(np.log10(np.abs(ci[0] * fv + ci[1] - p)))
for pp, p in enumerate(prop)
for ci, fv in zip(coef_ints[pp], feat_vals)
]
).reshape((len(prop), -1)),
axis=0,
)
def convex1d_overlap(
self, set_1, set_2, min_dist, inds, mat_inds, start, end, width=1e-6
):
feat_vals_1 = np.array(
[feat.value[mat_inds[set_1]] for feat in self.phi[inds[start:end]]]
)
feat_vals_2 = np.array(
[feat.value[mat_inds[set_2]] for feat in self.phi[inds[start:end]]]
)
min_1 = np.min(feat_vals_1, axis=1)
max_1 = np.max(feat_vals_1, axis=1)
min_2 = np.min(feat_vals_2, axis=1)
max_2 = np.max(feat_vals_2, axis=1)
length = np.min(np.column_stack((max_1, max_2)), axis=1) - np.max(
np.column_stack((min_1, min_2)), axis=1
)
is_overlap = length >= 0.0
inds_no_overlap = np.where(is_overlap == False)[0]
inds_overlap = np.where(is_overlap)[0]
if len(inds_no_overlap) > 0:
min_dist[inds_no_overlap] = np.max(
np.column_stack((min_dist[inds_no_overlap], length[inds_no_overlap])),
axis=1,
)
if len(inds_overlap) > 0:
min_length = np.min(
np.column_stack(
(
np.max(feat_vals_1, axis=1) - np.min(feat_vals_1, axis=1),
np.max(feat_vals_2, axis=1) - np.min(feat_vals_2, axis=1),
)
),
axis=1,
)
overlap_length = np.zeros(len(min_length))
overlap_length[inds_overlap] = (
length[inds_overlap] / min_length[inds_overlap]
)
overlap_length[np.where(min_length == 0)[0]] = 1.0
number = np.array(
[
len(np.where((f >= min_1[ff] - width) & (f <= max_1[ff] + width))[0])
for ff, f in enumerate(feat_vals_2)
]
)
number += np.array(
[
len(np.where((f >= min_2[ff] - width) & (f <= max_2[ff] + width))[0])
for ff, f in enumerate(feat_vals_1)
]
)
return number, overlap_length, min_dist
def project_classification(self, prop, inds, mat_inds, start=0, end=-1):
"""Calculate the projection scores for classification problems
Args:
prop (np.ndarray): vector to project the features onto
inds (np.ndarray): list of indicies to calculate the scores
mat_inds (np.ndarray): list of indicies for which materials to use
start(int, optional): index to begin projection calculation
end(int, optional): index to end projection calculation
Returns:
scores (np.ndarray): scores used for SiS
"""
prop_sorted = [np.sort(pp[mat_inds]) for pp in prop]
prop_inds = [np.argsort(pp[mat_inds]) for pp in prop]
overlap_length = np.zeros((len(inds[start:end]), len(prop)))
overlap_n = np.zeros((len(inds[start:end]), len(prop)))
min_dist = np.ones((len(inds[start:end]), len(prop)))
min_dist *= -1.0 * self.max_abs_feat_val
pp = -1
for p_sort, p_inds in zip(prop_sorted, prop_inds):
pp += 1
test, start_group = np.unique(p_sort, return_index=True)
for s1, start_1 in enumerate(start_group[:-1]):
end_1 = start_group[s1 + 1]
if p_sort[start_1] == 0:
continue
for s2, start_2 in enumerate(start_group[s1 + 1 :]):
if s2 != len(start_group[s1 + 1 :]) - 1:
end_2 = start_group[s1 + 1 :][s2 + 1]
else:
end_2 = len(p_inds)
if p_sort[start_2] == 0:
continue
N, S, min_dist[:, pp] = self.convex1d_overlap(
p_inds[start_1:end_1],
p_inds[start_2:end_2],
min_dist[:, pp],
inds,
mat_inds,
start,
end,
width=self.class_width,
)
overlap_n[:, pp] += N
overlap_length[:, pp] += S
is_overlap = np.abs(overlap_length) != 0.0
inds_overlap = np.where(is_overlap)
inds_no_overlap = np.where(is_overlap == False)
min_dist[inds_overlap] = 0.0
score = len(inds[start:end]) - overlap_n
score *= 100.0 * max(
1.0 / (1.0 + np.min(overlap_length)), np.max(np.abs(min_dist))
)
score[inds_overlap[0], inds_overlap[1]] += 1.0 / (
overlap_length[inds_overlap[0], inds_overlap[1]] + 1.0
)
score[inds_no_overlap[0], inds_no_overlap[1]] -= min_dist[
inds_no_overlap[0], inds_no_overlap[1]
]
score = np.max(score, axis=1)
sort_inds = np.argsort(score)[::-1]
return score
def sis(self, prop=None, mat_inds="all"):
"""Perform a round of SIS on the dataset
Args:
prop (np.ndarray, optional): vector to project the features onto
mat_inds (list): list of materials to include
Returns:
float: The best feature's projection value
Raises:
ValueError: If prop and self.prop are None
"""
if mat_inds == "all":
mat_inds = range(len(self.prop))
mat_inds = np.array(mat_inds)
if prop is None:
prop = [self.prop.flatten()[mat_inds]]
inds = np.delete(
np.arange(len(self.phi), dtype=int), np.array(self.selected_inds, dtype=int)
)
start, end = get_mpi_start_end_from_list(len(inds))
if prop is None:
prop = self.prop
t0 = time()
if self.learn_type == "log":
projection_scores = self.project_log(prop, inds, mat_inds, start, end)
elif self.learn_type == "correlation":
projection_scores = self.project_r(prop, inds, mat_inds, start, end)
elif self.learn_type == "classification":
projection_scores = self.project_classification(
prop, inds, mat_inds, start, end
)
# print(f"projection: {time()-t0}")
projection_scores[np.where(np.isnan(projection_scores))[0]] = 0.0
projection_scores = np.array(
list(chain.from_iterable(allgather_object(projection_scores)))
)
indexes = projection_scores.argsort()[::-1]
indices_sorted = inds[indexes]
sel_inds = []
D = np.zeros((self.n_sis_select, len(mat_inds)))
if len(self.selected_inds) > 0:
D = np.vstack(
(
np.array(
[self.phi[ii].value[mat_inds] for ii in self.selected_inds]
),
D,
)
)
ii = 0
dd = len(self.selected_inds)
else:
sel_inds.append(indices_sorted[0])
D[0, :] = self.phi[indices_sorted[0]].value[mat_inds]
ii = 1
dd = 1
# print(f"generate D: {time() - t0}")
while len(sel_inds) < self.n_sis_select:
stand_val = self.phi[indices_sorted[ii]].value[mat_inds]
check_vals = np.array(
[
np.abs(stats.linregress(stand_val, D[ff, :])[2])
for ff in range(D.shape[0])
]
)
if np.max(check_vals) < self.cross_corr_max and stand_val.std() > 1e-12:
sel_inds.append(indices_sorted[ii])
D[dd, :] = stand_val
dd += 1
ii += 1
# print(f"populate_list: {time() - t0}")
self.selected_inds += list(sel_inds)
self.list_sel_inds.append(list(sel_inds))
def get_select_nodes(self, selected_inds=None):
"""Get a subset of the nodes defined by selected_inds
Args:
selected_inds (list of ints, optional): indices used to get the selected nodes
Returns:
list of Nodes: The selected nodes
"""
if selected_inds is None:
selected_inds = self.selected_inds
return self.phi[selected_inds]
def clear_selection(self):
"""Clear all selected features (used for resets)"""
self.selected_inds = []
self.list_sel_inds = []
@property
def sis_selected(self):
"""Get the list of Nodes selected for by SIS
Returns:
list of Nodes: The features selected by SIS
"""
return self.phi[self.selected_inds]
@property
def phi(self):
"""Get the complete feature space"""
return np.array(self.phi_non_param + self.phi_param)
def get_phi(self, rung=-1):
"""Get the complete feature space"""
return np.array(
self.phi_non_param[: self.phi_non_param_gen_end[rung]]
+ self.phi_param[: self.phi_param_gen_end[rung]]
)
def get_phi_0_from_df(df):
"""Summary
Args:
df (pandas.DataFrame): The DataFrame to get phi_0 from
Returns:
list of FeatureNodes: The FeatureNodes of the columns in df
"""
columns = df.columns.tolist()
tags = list([col.split("(")[0] for col in columns])
units = list([col.split("(")[1].split(")")[0] for col in columns])
values = df.to_numpy().T
phi_0 = []
for tag, unit, value in zip(tags, units, values):
if unit.lower() == "unitless" or unit.lower() == "":
unit = None
else:
unit = unit.replace(" ", "")
phi_0.append(FeatureNode(value, tag.replace(" ", ""), unit))
return phi_0
from sisso.feature_creation.nodes.node import Node
from sisso.feature_creation.nodes.feature_node import FeatureNode
"""Define FeatureNode class"""
import numpy as np
import sympy
from sisso.feature_creation.nodes.node import Node
from sisso.feature_creation.nodes.unit import Unit
class FeatureNode(Node):
"""A node used to describe a primary feature for the descriptor"""
def __init__(self, value, tag, unit=None):
"""A node used to describe a primary feature for the descriptor
Args:
value (number or array): The value of the feature for all materials
tag (str): A string that represents the feature in algebraic expressions (will be converted to sympy.Symbols)
unit (str, optional): A string that describes the unit of the feature (will be converted to sympy.Symbols)
"""
if np.any(np.isnan(value)):
raise ValueError("An element in the feature node is not a number")
super(FeatureNode, self).__init__(value, tag)
self._expr = sympy.Symbol(self._tag)
self._n_els = len(value)
if unit is None:
self._unit = Unit(dict())
else:
self._unit = Unit.from_str(unit)
@property
def expr(self):
"""
Returns:
sympy.Symbols: The Symbol representation of tag
"""
return self._expr
@property
def unit(self):
"""
Returns:
sympy.Symbols: The Symbol representation of the unit
"""
return self._unit
import numpy as np
def fxn_add(x, pvt):
"""Addition function"""
return np.add(x[pvt:], x[:pvt])
def fxn_sub(x, pvt):
"""Subtraction function"""
return np.subtract(x[pvt:], x[:pvt])
def fxn_abs_diff(x, pvt, alpha=1.0, b=1.0, c=0.0):
"""Parameterized absolute difference function"""
return b * np.abs(np.subtract(x[pvt:], alpha * x[:pvt])) + c
def fxn_mult(x, pvt):
"""Multiplication function"""
return np.multiply(x[pvt:], x[:pvt])
def fxn_div(x, pvt, alpha=1.0, a=0.0, c=0.0):
"""Parameterized division function"""
return np.divide(x[pvt:], x[:pvt] * alpha + a) + c
def fxn_exp(x, alpha=1.0, a=1.0, c=0.0):
"""Parameterized exponential function"""
return a * np.exp(alpha * x) + c
def fxn_neg_exp(x, alpha=1.0, a=1.0, c=0.0):
"""Parameterized negative exponential function"""
return a * np.exp(-alpha * x) + c
def fxn_inv(x, alpha=1.0, a=0.0, c=0.0):
"""Parameterized inverse function"""
return np.divide(1.0, alpha * x + a) + c
def fxn_sq(x):
"""Square function"""
return np.power(x, 2.0)
def fxn_cb(x):
"""Cube function"""
return np.power(x, 3.0)
def fxn_six_pwr(x):
"""Function to take the sixth power of all items in an array"""
return np.power(x, 6.0)
def fxn_sqrt(x, alpha=1.0, a=0.0, b=1.0, c=0.0):
"""Parameterized square root function"""
return b * np.sqrt(x * alpha + a) + c
def fxn_cbrt(x, alpha=1.0, a=0.0, c=0.0):
"""Parameterized cube root function"""
return np.cbrt(x * alpha + a) + c
def fxn_log(x, alpha=1.0, a=0.0, c=0.0):
"""Parameterized log function"""
return alpha * np.log(x + a) + c
def fxn_abs(x):
"""Parameterized abs function"""
return np.abs(x)
def fxn_sin(x, alpha=1.0, a=0.0, b=1.0, c=0.0):
"""Parameterized sin function"""
return b * np.sin(alpha * x + a) + c
def fxn_cos(x, alpha=1.0, a=0.0, b=1.0, c=0.0):
"""Parameterized cos function"""
return b * np.cos(alpha * x + a) + c
fxn_add.name = "add"
fxn_add.default_params = {}
add = fxn_add
fxn_sub.name = "sub"
fxn_sub.default_params = {}
sub = fxn_sub
fxn_abs_diff.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
fxn_abs_diff.name = "abs_diff"
abs_diff = fxn_abs_diff
fxn_mult.name = "mult"
fxn_mult.default_params = {}
mult = fxn_mult
fxn_div.name = "div"
fxn_div.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
div = fxn_div
fxn_exp.name = "exp"
fxn_exp.default_params = {"alpha": 1.0, "a": 1.0, "c": 0.0}
exp = fxn_exp
fxn_neg_exp.name = "neg_exp"
fxn_neg_exp.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
neg_exp = fxn_neg_exp
fxn_inv.name = "inv"
fxn_inv.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
inv = fxn_inv
fxn_sq.name = "sq"
fxn_sq.default_params = {}
sq = fxn_sq
fxn_cb.name = "cb"
fxn_cb.default_params = {}
cb = fxn_cb
fxn_six_pwr.name = "six_pwr"
fxn_six_pwr.default_params = {}
six_pwr = fxn_six_pwr
fxn_sqrt.name = "sqrt"
fxn_sqrt.default_params = {"alpha": 1.0, "a": 0.0, "b": 1.0, "c": 0.0}
sqrt = fxn_sqrt
fxn_cbrt.name = "cbrt"
fxn_cbrt.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
cbrt = fxn_cbrt
fxn_log.name = "log"
fxn_log.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
log = fxn_log
fxn_abs.name = "abs"
fxn_abs.default_params = {}
abs = fxn_abs
fxn_sin.name = "sin"
fxn_sin.default_params = {"alpha": 1.0, "a": 0.0, "b": 1.0, "c": 0.0}
sin = fxn_sin
fxn_cos.name = "cos"
fxn_cos.default_params = {"alpha": 1.0, "a": 0.0, "b": 1.0, "c": 0.0}
cos = fxn_cos
"""Define the base Node, FeatureNode and OperatorNode classes"""
import warnings
import numpy as np
import sympy
from scipy import optimize
from .unit import Unit
const = sympy.sympify("const")
class Node(object):
"""Base class for the tree representation of the descriptors"""
def __init__(self, value=None, tag=None):
"""Summary
Args:
value (number of array, optional): Value of the node
tag (str, optional): Expression describing the node
"""
self._value = value
self._tag = tag
self._expr = None
self._unit = None
def __eq__(self, node):
"""Comparison operator for nodes
Args:
node (node.Node): Node to compare against
Returns:
bool: True if self.norm_value == node.norm_value
"""
if not issubclass(type(node), Node):
return False
if np.allclose(self.norm_value, node.norm_value, atol=1e-8):
return True
return False
def __neq__(self, node):
"""Comparison operator for nodes
Args:
node (node.Node): Node to compare against
Returns:
bool: True if self.norm_value != node.norm_value
"""
return not self.__eq__(node)
@property
def value(self):
"""
Returns:
number or array: the numerical value of the node
"""
return self._value
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to all attached Nodes
Returns:
None: The result of applying _func to attached Nodes (None since this is a virtual class with no attached feature nodes)
"""
return self._value
@property
def norm_value(self):
"""
Returns:
array: The normalized value array
"""
value = np.array(self.fxn_in_value)
return value / np.linalg.norm(value)
def isnan(self, max_val):
"""Determines if a feature is valid
Args:
max_val(float): maximum allowed absolute value for the feature
Returns:
bool: True if a feature has nan or its maximum absolute value is greater than max_val
"""
val = self.value
return (
(np.isnan(np.dot(val, np.ones(val.shape))))
or (np.abs(val).max() > max_val)
or (val.std() < 1e-13)
)
@property
def standardized_value(self):
"""
Returns:
array: The standardized value of the node
"""
value = np.array(self.value).flatten()
return (value - value.mean()) / (value.std())
@property
def standardized_fxn_in_value(self):
"""
Returns:
array: The standardized value of the node
"""
value = np.array(self.fxn_in_value).flatten()
value -= value.mean()
value /= np.linalg.norm(value)
return value
@property
def tag(self):
"""
Returns:
expression: The string/algebraic representation of the node
"""
return self._tag
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return self._expr
@property
def unit(self):
"""
Returns:
expression: The string/algebraic representation of the node
"""
return self._unit
@expr.setter
def expr(self, expr):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
self._expr = expr
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return self.expr
@unit.setter
def unit(self, unit):
"""Sets the unit for a feature
Args:
unit (sympy.Expression): The representation of the unit
"""
self._unit = unit
def copy(self):
"""Copies a node
Returns:
to_cp(node.Node): A new Node that is the copy of the current node
"""
to_cp = Node(value=self.value.copy(), tag=self._tag)
to_cp.expr = self.expr
to_cp.unit = self.unit
return to_cp
"""Definition of the OperatorNode class"""
import warnings
import numpy as np
import sympy
from scipy import optimize
from sisso.feature_creation.nodes.unit import Unit
from sisso.feature_creation.nodes.node import Node
const = sympy.sympify("const")
class OperatorNode(Node):
"""Node used add mathematical/algebraic operations on top of the nodes
"""
def __init__(self, func, tag, feats=None):
"""Node used add mathematical/algebraic operations on top of the nodes
Args:
func (Function): Function to apply to the attached Nodes
tag (str): String tag that is used to represent the function in expressions
"""
super(OperatorNode, self).__init__(tag=tag)
self._func = func
self._fxn_in_value = None
self.feats = feats
self._n_els = self.feats[0]._n_els
self._expr = None
self._fxn_in_expr = None
self._unit = None
self._params = None
@property
def func(self):
"""Accessor to the function
Returns:
Function: Function to apply to the attached Nodes
"""
return self._func
@property
def value(self):
"""Calculate the value of the node by applying _func to all attached Nodes
Returns:
None: The result of applying _func to attached Nodes (None since this is a virtual class with no attached feature nodes)
"""
if self._value is not None:
return self._value
return self._func(
np.hstack([f.fxn_in_value for f in self._feats]), **self.params
)
def set_value(self):
self._value = self._func(
np.hstack([f.fxn_in_value for f in self._feats]), **self.params
)
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to all attached Nodes
Returns:
None: The result of applying _func to attached Nodes (None since this is a virtual class with no attached feature nodes)
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(
np.hstack([f.fxn_in_value for f in self._feats]), **self.fxn_in_params
)
def set_fxn_in_value(self):
self._fxn_in_value = self._func(
np.hstack([f.fxn_in_value for f in self._feats]), **self.fxn_in_params
)
@property
def expr(self):
"""Property accessor to the algebraic representation of the function.
Returns:
sympy.Expression: Expression used to describe the function
"""
if self._expr is None:
self._expr = self.get_expr()
return self._expr
def get_expr(self):
"""Accessor to the algebraic representation of the function.
Returns:
sympy.Expression: Expression used to describe the function
"""
exprs = [sympy.simplify(f.fxn_in_expr) for f in self.feats]
return sympy.Function(self._tag)(*exprs)
@expr.setter
def expr(self, expr):
self._expr = expr
@property
def fxn_in_expr(self):
"""Property accessor to the algebraic representation of the function.
Returns:
sympy.Expression: Expression used to describe the function
"""
if self._fxn_in_expr is None:
self._fxn_in_expr = self.get_fxn_in_expr()
return self._fxn_in_expr
def get_fxn_in_expr(self):
"""Accessor to the algebraic representation of the function.
Returns:
sympy.Expression: Expression used to describe the function
"""
return self.expr
@fxn_in_expr.setter
def fxn_in_expr(self, expr):
self._fxn_in_expr = expr
@property
def unit(self):
"""Accessor function to the unit
Returns:
sympy.Expression: the updated unit after the operation is applied
"""
if self._unit is None:
self._unit = self.get_unit()
return self._unit
def get_unit(self):
"""Calculate the new unit after applying the operation on the feat
Returns:
sympy.Expression: Expression describing the new unit
"""
return self._feats[0].unit
@unit.setter
def unit(self, unit):
self._unit = unit
@property
def feats(self):
"""Accessor to feat
Returns:
Node: The node that the operator is to be applied to
"""
return self._feats
@feats.setter
def feats(self, node_list):
"""Setter for feat
Args:
node (Node): The feature the operation is to be applied to
Raises:
TypeError: If node is not a type of Node
"""
if issubclass(type(node_list), Node):
self._feats = [node_list]
return
for node in node_list:
if not issubclass(type(node), Node):
raise TypeError("Feature node must be of a type derived from Node")
self._feats = node_list
@property
def feat(self):
return self._feats[0]
def copy(self):
"""Copies a node
Returns:
to_cp(node.Node): A new Node that is the copy of the current node
"""
to_cp = Node(value=self.value.copy(), tag=self._tag)
to_cp._expr = self.expr
to_cp._fxn_in_expr = self.fxn_in_expr
to_cp.unit = self.unit
return to_cp
def set_default_params(self):
self._params = self._func.default_params.copy()
@property
def params(self):
"""Property accessor to the algebraic representation of the function.
Returns:
sympy.Expression: Expression used to describe the function
"""
if self._params is None:
self._params = self._func.default_params.copy()
return self._params
@params.setter
def params(self, params):
self._params = params
@property
def fxn_in_params(self):
params = self.params.copy()
params.pop("c", None)
params.pop("b", None)
return self.params
def parameterize(self, prop, mat_inds="all", fix_c_0=False):
"""Parameterize the function
Args:
prop (np.ndarray): The property to fit
mat_inds (np.ndarray(int) or "all"): list of materials to include in parameterization
"""
if isinstance(mat_inds, str) and mat_inds == "all":
mat_inds = range(len(prop))
try:
self.reset_pivot(len(mat_inds))
except AttributeError:
pass
mat_inds = list(np.array(mat_inds)[self.feat.fxn_in_value[mat_inds].argsort()])
with warnings.catch_warnings():
warnings.simplefilter("error", optimize.OptimizeWarning)
self.params = self.initial_params(prop.flatten(), mat_inds)
if fix_c_0:
self.params = self.fxn_in_params
try:
params, _ = optimize.curve_fit(
self.func,
np.hstack([f.fxn_in_value[mat_inds] for f in self._feats]),
prop[mat_inds].flatten(),
list(self._params.values()),
check_finite=False,
bounds=self.bounds,
)
for key, val in zip(self._params.keys(), params):
self._params[key] = val
except (RuntimeError, optimize.OptimizeWarning) as e:
pass
if fix_c_0:
self._params["c"] = 0.0
try:
self.reset_pivot(self._n_els)
except AttributeError:
pass
import numpy as np
from scipy import interpolate
import sympy
from .abs_diff import AbsDiffNode
from .absolute_value import AbsNode
from .add import AddNode
from .cos import CosNode
from .cube import CubNode
from .cube_root import CbrtNode
from .divide import DivNode
from .exponential import ExpNode, NegExpNode
from .inverse import InvNode
from .log import LogNode
from .multiply import MultNode
from .sin import SinNode
from .sixth_power import SixthPowerNode
from .square import SqNode
from .square_root import SqrtNode
from .subtract import SubNode
const = sympy.sympify("const")
op_map = {
"add": AddNode,
"sub": SubNode,
"abs_diff": AbsDiffNode,
"mult": MultNode,
"div": DivNode,
"exp": ExpNode,
"neg_exp": NegExpNode,
"inv": InvNode,
"sq": SqNode,
"cb": CubNode,
"sixth_power": SixthPowerNode,
"sqrt": SqrtNode,
"cbrt": CbrtNode,
"log": LogNode,
"abs": AbsNode,
"sin": SinNode,
"cos": CosNode,
}
binary_ops = [AddNode, SubNode, AbsDiffNode, MultNode, DivNode]
commutative_ops = [AddNode, SubNode, AbsDiffNode, MultNode]
param_ops = [
DivNode,
AbsDiffNode,
ExpNode,
NegExpNode,
InvNode,
SqrtNode,
CbrtNode,
LogNode,
SinNode,
CosNode,
]
"""File to describe AbsDiffNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from sisso.feature_creation.nodes.operator_node import OperatorNode
class AbsDiffNode(OperatorNode):
"""Node to add absolute difference operators to the features
"""
def __init__(self, feat_1, feat_2):
"""Node to add absolute difference operators to the features
Args:
feat_1 (Node): Feature on the left of the absolute difference
feat_2 (Node): Feature on the right of the absolute difference
Raises:
ValueError: If the unit for feat_1 and feat_2 are not the same or the resulting feature would be constant
"""
if feat_1.unit != feat_2.unit:
raise ValueError("When subtracting both features must have the same units")
if feat_1 == feat_2:
raise ValueError("When subtracting both features must be different")
func = lambda x, alpha=1.0, b=1.0, c=0.0: fxn.abs_diff(
x, self._n_els, alpha, b, c
)
func.default_params = {"alpha": 1.0, "b": 1.0, "c": 0.0}
func.name = "abs_diff"
super(AbsDiffNode, self).__init__(func, "|-|", [feat_1, feat_2])
self.bounds = ([0.0, -np.inf, -np.inf], np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess (standard approximation as no good way to guess)
"""
return {"alpha": 1.0, "b": 1.0, "c": 0.0}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(
np.hstack([f.fxn_in_value for f in self.feats]), self.params["alpha"]
)
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._value = self._func(
np.hstack([f.fxn_in_value for f in self.feats]), self.params["alpha"]
)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
new_expr = self.params["b"] * sympy.Abs(
(self.feats[0].fxn_in_expr)
- self.params["alpha"] * (self.feats[1].fxn_in_expr)
)
return sympy.trigsimp(new_expr)
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
new_expr = sympy.Abs(
(self.feats[0].fxn_in_expr)
- self.params["alpha"] * (self.feats[1].fxn_in_expr)
)
return sympy.trigsimp(new_expr)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feats[0].unit
def reset_pivot(self, pvt):
"""reset the divider for self._func"""
self._func = lambda x, alpha=1.0, b=1.0, c=0.0: fxn.abs_diff(
x, pvt, alpha, a, c
)
self._func.default_params = {"alpha": 1.0, "b": 1.0, "c": 0.0}
self._func.name = "abs_diff"
"""File to describe AbsNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from sisso.feature_creation.nodes.operator_node import OperatorNode
class AbsNode(OperatorNode):
"""Node to add absolute value operators to features
"""
def __init__(self, feat):
"""Node to add absolute value operators to features
Args:
feat (Node): Feature to add the absolute value operator to
Raises:
ValueError: If feat.value is strictly not-negative
"""
if np.sign(np.min(feat.value)) >= 0:
raise ValueError("Absoulte value will produce the same feature")
super(AbsNode, self).__init__(fxn.abs, "abs", feat)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return sympy.Abs(self.feat.fxn_in_expr)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feat.unit
"""File to describe AddNode"""
import sisso.feature_creation.nodes.functions as fxn
import sympy
from sisso.feature_creation.nodes.operator_node import OperatorNode
class AddNode(OperatorNode):
"""Node to add subtraction operators to the features
"""
def __init__(self, feat_1, feat_2):
"""Node to add subtraction operators to the features
Args:
feat_1 (Node): Feature on the left of the subtraction
feat_2 (Node): Feature on the right of the subtraction
Raises:
ValueError: If the unit for feat_1 and feat_2 are not the same, the resulting feature would be a constant or go outside accepted range
# """
if feat_1.unit != feat_2.unit:
raise ValueError("When subtracting both features must have the same units")
if feat_1 == feat_2:
raise ValueError("When subtracting both features must be different")
func = lambda x: fxn.add(x, self._n_els)
func.default_params = {}
func.name = "add"
super(AddNode, self).__init__(func, "+", [feat_1, feat_2])
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
new_expr = (self.feats[0].fxn_in_expr) + (self.feats[1].fxn_in_expr)
return sympy.trigsimp(new_expr)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feats[0].unit
def reset_pivot(self, pvt):
"""reset the divider for self._func"""
self._func = lambda x: fxn.add(x, pvt)
self._func.default_params = {}
self._func.name = "add"
"""File to describe CosNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import integrate
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.unit import Unit
class CosNode(OperatorNode):
"""Node to add cos operators to features
"""
def __init__(self, feat):
"""Node to add cos operators to features
Args:
feat (Node): Feature to add the cos operator to
Raises:
ValueError: If feat is not unitless
"""
disallowed = ["exp", "exp(-)", "log", "sin", "cos"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(CosNode, self).__init__(fxn.cos, "cos", feat)
self.bounds = (-np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Get estimates for b and c from the mean and range of prop
b = (prop[mat_inds].max() - prop[mat_inds].min()) / 2.0
c = prop[mat_inds].mean()
# Rescale prop and remove outliers
prop_scaled = (prop[mat_inds] - c) / b
prop_scaled[np.where(prop_scaled > 1)] = 1
prop_scaled[np.where(prop_scaled < -1)] = -1
inds = val.argsort()
x = val[inds]
y = prop_scaled[inds]
# Take the cumulative of prop_scaled as an approximation
int_y = integrate.cumtrapz(y, x, initial=0.0)
alpha = 2.0 / (np.max(int_y) - np.min(int_y))
sin_cent = 1.0 / alpha * np.cos(alpha * x)
a = (x[np.argmin(int_y)] - x[np.argmin(sin_cent)]) * (alpha)
return {"alpha": alpha, "a": a, "b": b, "c": c}
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return (
self.params["b"]
* sympy.trigsimp(
sympy.cos(
self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"]
)
)
+ self.params["c"]
)
@property
def fxn_in_expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return sympy.trigsimp(
sympy.cos(self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"])
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return Unit()
"""File to describe CubeNode"""
import sisso.feature_creation.nodes.functions as fxn
import sympy
from sisso.feature_creation.nodes.operator_node import OperatorNode
class CubNode(OperatorNode):
"""Node to add cube operators to features
"""
def __init__(self, feat):
"""Node to add cube operators to features
Args:
feat (Node): Feature to add the cube operator to
Raises:
ValueError: If feature would leave accepted range
"""
if feat.tag == "cbrt":
raise ValueError("Invalid feature combination")
super(CubNode, self).__init__(fxn.cb, "**3", feat)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return sympy.expand(
sympy.trigsimp(sympy.powdenest(self.feat.fxn_in_expr ** 3, force=True))
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feat.unit ** 3.0
"""File to describe CbrtNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import stats
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.operator_nodes.derivatives import get_deriv
class CbrtNode(OperatorNode):
"""Node to add cube root operators to features
"""
def __init__(self, feat):
"""Node to add cube root operators to features
Args:
feat (Node): Feature to add the cube root operator to
"""
disallowed = ["**3", "**6"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(CbrtNode, self).__init__(fxn.cbrt, "cbrt", feat)
self.bounds = (-1.0 * np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Get the sign of the alpha parameter
alpha_sign = float(np.sign(stats.linregress(val, prop[mat_inds])[0]))
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val, prop[mat_inds], 1)
prop_trans = np.abs(prop_prim) ** (-1.5)
# Check if \alpha * a is in the Domain if Not do normal analysis
if np.max(np.abs(prop_prim)) > 1.0:
ind_max = np.abs(prop_prim).argmax()
x = x[ind_max:]
prop_trans = prop_trans[ind_max:]
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
if np.max(np.abs(prop_prim)) > 1.0:
# Get alpha from the slope of the line
alpha = stats.linregress(x[inds], prop_trans[inds])[0]
alpha = alpha_sign * 27.0 / alpha ** 2.0
# Calculate a from the location of the max value of prop_prim and alpha
a = -1.0 * x[0] * alpha
else:
# Get the sign of a
a_sign = -1.0 * float(np.sign(stats.linregress(x, prop_prim)[0]))
# Get initial parameter guess
alpha, a = stats.linregress(x[inds], prop_trans[inds])[:2]
# Correct alpha and a
alpha = alpha_sign * 27.0 / alpha ** 2.0
a *= a_sign * (np.abs(alpha) / 3.0) ** (1.5)
# Get an estimate of the constant shift and scale factor
c = stats.linregress(np.cbrt(alpha * val + a), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(self.feat.fxn_in_value)
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(self.feat.fxn_in_value)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return sympy.trigsimp(
sympy.powdenest(
sympy.cbrt(
self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"]
),
force=True,
)
+ self.params["c"]
)
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return sympy.trigsimp(
sympy.powdenest(sympy.cbrt(self.feat.fxn_in_expr), force=True)
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feat.unit ** (1.0 / 3.0)
"""Get the derivatives of the property with respect to the features"""
import numpy as np
from scipy import interpolate
def moving_avg_deriv(x, y, window_length=50):
d = np.zeros(len(x) - window_length)
for ind in range(len(x) - window_length):
window = range(ind, ind + window_length)
xy = np.sum((x[window] - np.mean(x[window])) * (y[window] - np.mean(y[window])))
xx = np.sum((x[window] - np.mean(x[window])) * (x[window] - np.mean(x[window])))
d[ind] = xy / xx
return x[window_length:-window_length], d[:-window_length]
def get_deriv(val, prop, order=1):
"""Calculate the derivative of a property with respect to a given feature
Args:
val (np.ndarray(float)): The feature value array
prop (np.ndarray(float)): The property vector
Returns:
val_avg:
"""
test = np.dot(val, prop)
inds = val.argsort()
if not np.isfinite(test) or test == 0.0:
raise ValueError("Invalid feature")
if order > 3:
raise ValueError("Requested order is too high")
order_poly = min(10, max(75, len(val) // 20))
x = val[inds].copy()
y = prop[inds].copy()
if order == 0:
try:
return np.polynomial.Legendre.fit(x, y, order_poly).linspace(1001)
except np.linalg.LinAlgError:
raise ValueError("Unable to perform initial polynomial fitting")
for ii in range(order):
try:
poly = np.polynomial.Legendre.fit(x, y, order_poly)
except np.linalg.LinAlgError:
raise ValueError("Unable to perform initial polynomial fitting")
x, y = moving_avg_deriv(*poly.linspace(1001))
return x, y
"""File to describe DivNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import stats
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.operator_nodes.derivatives import get_deriv
class DivNode(OperatorNode):
"""Node to add Div operators to the features
"""
def __init__(self, feat_1, feat_2):
"""Node to add Div operators to the features
Args:
feat_1 (Node): Feature on the left of the Div
feat_2 (Node): Feature on the right of the Div
Raises:
ValueError: If feature would leave accepted range
"""
func = lambda x, alpha=1.0, a=0.0, c=0.0: fxn.div(x, self._n_els, alpha, a, c)
func.name = "div"
func.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
super(DivNode, self).__init__(func, "/", [feat_1, feat_2])
self.bounds = (-1.0 * np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val_1 = self.feats[0].fxn_in_value[mat_inds]
val_2 = self.feats[1].fxn_in_value[mat_inds]
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val_2, val_1 * prop[mat_inds], 1)
prop_trans = 1.0 / np.sqrt(np.abs(prop_prim))
# Check for the asymtope
if np.abs(prop_prim).max() > 10.0:
inds_stop = np.where(np.abs(prop_prim) > 10.0)[0]
a_alpha = np.mean(x[inds_stop])
prop_trans = prop_trans[: inds_stop[0]]
x = x[: inds_stop[0]]
alpha_sign = 1.0
if val_2[prop[mat_inds].argmin()] < val_2[prop[mat_inds].argmax()]:
alpha_sign = -1.0
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
if np.abs(prop_prim).max() > 10.0:
alpha = alpha_sign * stats.linregress(x[inds], prop_trans[inds])[0]
a = -1.0 * a_alpha * alpha
c = np.abs(val_1 / prop[mat_inds]).min()
else:
# Get initial parameter guess
alpha, a = stats.linregress(x[inds], prop_trans[inds])[:2]
# Correct alpha and a
alpha = 2.0 * alpha ** 3.0
a *= (2.0 * alpha ** 2.0) ** (1.0 / 3.0)
# Get an estimate of the constant shift
c = stats.linregress(val_1 / (alpha * val_2 + a), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(np.hstack([f.fxn_in_value for f in self.feats]))
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(np.hstack([f.fxn_in_value for f in self.feats]))
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
new_expr = (self.feats[0].fxn_in_expr) / (
self.params["alpha"] * self.feats[1].fxn_in_expr + self.params["a"]
)
return new_expr + self.params["c"]
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
new_expr = self.feats[0].fxn_in_expr / self.feats[1].fxn_in_expr
return sympy.trigsimp(sympy.cancel(sympy.expand(new_expr)))
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feats[0].unit / self.feats[1].unit
def reset_pivot(self, pvt):
"""reset the divider for self._func"""
self._func = lambda x, alpha=1.0, a=0.0, c=0.0: fxn.div(x, pvt, alpha, a, c)
self._func.name = "div"
self._func.default_params = {"alpha": 1.0, "a": 0.0, "c": 0.0}
"""File to describe ExpNode and NegExpNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import stats
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.operator_nodes.derivatives import get_deriv
from sisso.feature_creation.nodes.unit import Unit
class ExpNode(OperatorNode):
"""Node to add exp operators to features
"""
def __init__(self, feat):
"""Node to add exp operators to features
Args:
feat (Node): Feature to add the exp operator to
Raises:
ValueError: If feat is not unitless or feature would leave accepted range
"""
disallowed = ["exp", "exp(-)", "log", "sin", "cos"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(ExpNode, self).__init__(fxn.exp, "exp", feat)
self.bounds = ([0.0, -np.inf, -np.inf], np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Determine the sign of the prefactor
a_sign = float(np.sign(stats.linregress(val, prop[mat_inds])[0]))
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val, prop[mat_inds])
prop_trans = np.log(np.abs(prop_prim))
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
# Get initial parameter guess
alpha, a = stats.linregress(x[inds], prop_trans[inds])[:2]
if alpha < 0.0:
raise ValueError("Data suggests negative exponential")
# Reformat a to be the right value
a = np.exp(a - np.log(alpha)) * a_sign
# Get an estimate of the constant shift
c = stats.linregress(a * np.exp(alpha * val), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(self.feat.fxn_in_value, self.params["alpha"])
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(self.feat.fxn_in_value, self.params["alpha"])
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return self.params["c"] + self.params["a"] * sympy.powdenest(
sympy.exp(self.params["alpha"] * self.feat.fxn_in_expr), force=True
)
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return sympy.powdenest(
sympy.exp(self.params["alpha"] * self.feat.fxn_in_expr), force=True
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return Unit()
class NegExpNode(OperatorNode):
"""Node to add the negative exponential operators to features
"""
def __init__(self, feat):
"""Node to add the negative exponential operators to features
Args:
feat (Node): Feature to add the the negative exponential operator to
Raises:
ValueError: If feat is not unitless or feature would leave accepted range
"""
disallowed = ["exp", "exp(-)", "log", "sin", "cos"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(NegExpNode, self).__init__(fxn.neg_exp, "exp(-)", feat)
self.bounds = ([0.0, -np.inf, -np.inf], np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Determine the sign of the prefactor
a_sign = -1.0 * float(np.sign(stats.linregress(val, prop[mat_inds])[0]))
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val, prop[mat_inds])
prop_trans = np.log(np.abs(prop_prim))
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
# Get initial parameter guess
alpha, a = stats.linregress(-1.0 * x[inds], prop_trans[inds])[:2]
if alpha < 0.0:
raise ValueError("Data suggests non-negative exponential")
# Reformat a to be the right value
a = np.exp(a - np.log(alpha)) * a_sign
# Get an estimate of the constant shift
c = stats.linregress(a * np.exp(-1.0 * alpha * val), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return self.params["c"] + self.params["a"] * sympy.powdenest(
sympy.exp(-1 * self.params["alpha"] * self.feat.fxn_in_expr), force=True
)
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(self.feat.fxn_in_value, self.params["alpha"])
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(self.feat.fxn_in_value, self.params["alpha"])
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return sympy.powdenest(
sympy.exp(-1 * self.params["alpha"] * self.feat.fxn_in_expr), force=True
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return Unit()
"""File to describe InverseNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import stats
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.operator_nodes.derivatives import get_deriv
class InvNode(OperatorNode):
"""Node to add inverse operators to features
"""
def __init__(self, feat):
"""Node to add inverse operators to features
Args:
feat (Node): Feature to add the inverse operator to
Raises:
ValueError: If feature would leave accepted range
"""
disallowed = ["exp", "exp(-)", "log", "inv"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(InvNode, self).__init__(fxn.inv, "inv", feat)
self.bounds = (-1.0 * np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val, prop[mat_inds], 1)
prop_trans = 1.0 / np.sqrt(np.abs(prop_prim))
# Check for the asymtope
if np.abs(prop_prim).max() > 10.0:
inds_stop = np.where(np.abs(prop_prim) > 10.0)[0]
a_alpha = np.mean(x[inds_stop])
prop_trans = prop_trans[: inds_stop[0]]
x = x[: inds_stop[0]]
alpha_sign = 1.0
if val[prop[mat_inds].argmin()] < val[prop[mat_inds].argmax()]:
alpha_sign = -1.0
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
if np.abs(prop_prim).max() > 10.0:
alpha = alpha_sign * stats.linregress(x[inds], prop_trans[inds])[0]
a = -1.0 * a_alpha * alpha
c = np.abs(prop[mat_inds]).min()
else:
# Get initial parameter guess
alpha, a = stats.linregress(x[inds], prop_trans[inds])[:2]
# Correct alpha and a
alpha = 2.0 * alpha ** 3.0
a *= (2.0 * alpha ** 2.0) ** (1.0 / 3.0)
# Get an estimate of the constant shift
c = stats.linregress(1.0 / (alpha * val + a), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(self.feat.fxn_in_value)
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(self.feat.fxn_in_value)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return (
1 / (self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"])
+ self.params["c"]
)
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return sympy.trigsimp(sympy.cancel(1 / self.feat.fxn_in_expr))
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feat.unit.inv()
"""File to describe LogNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import stats
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.operator_nodes.derivatives import get_deriv
from sisso.feature_creation.nodes.unit import Unit
class LogNode(OperatorNode):
"""Node to add log operators to features
"""
def __init__(self, feat):
"""Node to add log operators to features
Args:
feat (Node): Feature to add the log operator to
Raises:
ValueError: If feat is not unitless or feat.value < 1/max_abs_feat_val
"""
disallowed = ["exp", "exp(-)", "log", "sin", "cos", "inv"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(LogNode, self).__init__(fxn.log, "log", feat)
self.bounds = (-np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Get and Transform the derivative of prop with respect to feat
x, prop_prim = get_deriv(val, prop[mat_inds], 1)
prop_trans = (prop_prim) ** (-1.0)
# Disregard any strongly non-linear trends due to noise/outliers
threshold = np.median(np.abs(np.diff(prop_trans))) * 1.1
inds = np.where(np.abs(np.diff(prop_trans)) <= threshold)[0]
# Get initial parameter guess
alpha, a = stats.linregress(x[inds], prop_trans[inds])[:2]
# Correct alpha and a
alpha = 1.0 / alpha
a *= alpha
# Get an estimate of the constant shift and scale factor
c = stats.linregress(alpha * np.log(val + a), prop[mat_inds])[1]
return {"alpha": alpha, "a": a, "c": c}
@property
def fxn_in_value(self):
"""Calculate the value of the node by applying _func to feat_1 and feat_2 in that order
Returns:
np.ndarray: The result of applying _func to attached Nodes, with only alpha set the correct value
"""
if self._fxn_in_value is not None:
return self._fxn_in_value
return self._func(self.feat.fxn_in_value)
def set_fxn_in_value(self):
"""Sets the value array of the feature based off the function/feat.fxn_in_value"""
self._fxn_in_value = self._func(self.feat.fxn_in_value)
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return self.params["c"] + self.params["alpha"] * sympy.logcombine(
sympy.log(self.params["a"] + self.feat.fxn_in_expr), force=True
)
@property
def fxn_in_expr(self):
"""The sympy.Expression used to generate expressions for subsequent features that use this feature
Returns:
sympy.Expression: The algebraic representation of the new feature with only alpha used from the params
"""
return sympy.logcombine(sympy.log(self.feat.fxn_in_expr), force=True)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return Unit()
"""File to describe MultNode"""
import sisso.feature_creation.nodes.functions as fxn
from sisso.feature_creation.nodes.operator_node import OperatorNode
class MultNode(OperatorNode):
"""Node to add Mult operators to the features
"""
def __init__(self, feat_1, feat_2):
"""Node to add Mult operators to the features
Args:
feat_1 (Node): Feature on the left of the Mult
feat_2 (Node): Feature on the right of the Mult
Raises:
ValueError: If feature would leave accepted range
"""
func = lambda x: fxn.mult(x, self._n_els)
func.name = "mult"
func.default_params = {}
super(MultNode, self).__init__(func, "*", [feat_1, feat_2])
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
new_expr = self.feats[0].fxn_in_expr * self.feats[1].fxn_in_expr
return new_expr
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return self.feats[0].unit * self.feats[1].unit
def reset_pivot(self, pvt):
"""reset the divider for self._func"""
self._func = lambda x: fxn.mult(x, pvt)
self._func.name = "mult"
self._func.default_params = {}
"""File to describe SinNode"""
import numpy as np
import sisso.feature_creation.nodes.functions as fxn
import sympy
from scipy import integrate
from sisso.feature_creation.nodes.operator_node import OperatorNode
from sisso.feature_creation.nodes.unit import Unit
class SinNode(OperatorNode):
"""Node to add sin operators to features
"""
def __init__(self, feat):
"""Node to add sin operators to features
Args:
feat (Node): Feature to add the sin operator to
Raises:
ValueError: If feat is not unitless
"""
disallowed = ["exp", "exp(-)", "log", "sin", "cos"]
if feat.tag in disallowed:
raise ValueError("Invalid feature combination")
super(SinNode, self).__init__(fxn.sin, "sin", feat)
self.bounds = (-np.inf, np.inf)
def initial_params(self, prop, mat_inds):
"""Get an initial estimate of the parameters
Args:
prop (np.ndarray(float)): Property to fit to
mat_inds (np.ndarray(int)): Indexes to include in the fitting
Returns:
dict: The initial parameter guess based on the property and self.feat.fxn_in_value
"""
val = self.feat.fxn_in_value[mat_inds]
# Get estimates for b and c from the mean and range of prop
b = (prop[mat_inds].max() - prop[mat_inds].min()) / 2.0
c = prop[mat_inds].mean()
# Rescale prop and remove outliers
prop_scaled = (prop[mat_inds] - c) / b
prop_scaled[np.where(prop_scaled > 1)] = 1
prop_scaled[np.where(prop_scaled < -1)] = -1
inds = val.argsort()
x = val[inds]
y = prop_scaled[inds]
# Take the cumulative of prop_scaled as an approximation
int_y = integrate.cumtrapz(y, x, initial=0.0)
alpha = 2.0 / (np.max(int_y) - np.min(int_y))
cos_cent = 1.0 / alpha * np.sin(alpha * x)
a = (x[np.argmin(int_y)] - x[np.argmin(cos_cent)]) * (-alpha)
return {"alpha": alpha, "a": a, "b": b, "c": c}
@property
def expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return (
self.params["b"]
* sympy.trigsimp(
sympy.sin(
self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"]
)
)
+ self.params["c"]
)
@property
def fxn_in_expr(self):
"""The sympy.Expression for the resulting feature
Returns:
sympy.Expression: The algebraic representation of the new feature
"""
return sympy.trigsimp(
sympy.sin(self.params["alpha"] * self.feat.fxn_in_expr + self.params["a"])
)
# @property
def get_unit(self):
"""The sympy.Expression for the unit of the resulting feature
Returns:
sympy.Expression: The resulting unit of the feature
"""
return Unit()