Commit e40304a1 authored by lucas_miranda's avatar lucas_miranda
Browse files

Added tests for model_utils.py functions

parent d3e11a9a
...@@ -189,14 +189,15 @@ class uncorrelated_features_constraint(Constraint): ...@@ -189,14 +189,15 @@ class uncorrelated_features_constraint(Constraint):
self.weightage = weightage self.weightage = weightage
def get_config(self): def get_config(self):
"""Updates Constraint metadata"""
config = super().get_config().copy() config = super().get_config().copy()
config.update( config.update({"encoding_dim": self.encoding_dim, "weightage": self.weightage})
{"encoding_dim": self.encoding_dim, "weightage": self.weightage,}
)
return config return config
def get_covariance(self, x): def get_covariance(self, x):
"""Computes the covariance of the elements of the passed layer"""
x_centered_list = [] x_centered_list = []
for i in range(self.encoding_dim): for i in range(self.encoding_dim):
...@@ -210,7 +211,10 @@ class uncorrelated_features_constraint(Constraint): ...@@ -210,7 +211,10 @@ class uncorrelated_features_constraint(Constraint):
return covariance return covariance
# Constraint penalty # Constraint penalty
# noinspection PyUnusedLocal
def uncorrelated_feature(self, x): def uncorrelated_feature(self, x):
"""Adds a penalty on feature correlation, forcing more independent sets of weights"""
if self.encoding_dim <= 1: if self.encoding_dim <= 1:
return 0.0 return 0.0
else: else:
...@@ -229,34 +233,19 @@ class uncorrelated_features_constraint(Constraint): ...@@ -229,34 +233,19 @@ class uncorrelated_features_constraint(Constraint):
# Custom Layers # Custom Layers
class MCDropout(tf.keras.layers.Dropout): class MCDropout(tf.keras.layers.Dropout):
"""Equivalent to tf.keras.layers.Dropout, but with training mode enabled at prediction time.
Useful for Montecarlo predictions"""
def call(self, inputs, **kwargs): def call(self, inputs, **kwargs):
"""Overrides the call method of the subclassed function"""
return super().call(inputs, training=True) return super().call(inputs, training=True)
class KLDivergenceLayer(tfpl.KLDivergenceAddLoss):
def __init__(self, *args, **kwargs):
self.is_placeholder = True
super(KLDivergenceLayer, self).__init__(*args, **kwargs)
def get_config(self):
config = super().get_config().copy()
config.update(
{"is_placeholder": self.is_placeholder,}
)
return config
def call(self, distribution_a):
kl_batch = self._regularizer(distribution_a)
self.add_loss(kl_batch, inputs=[distribution_a])
self.add_metric(
kl_batch, aggregation="mean", name="kl_divergence",
)
self.add_metric(self._regularizer._weight, aggregation="mean", name="kl_rate")
return distribution_a
class DenseTranspose(Layer): class DenseTranspose(Layer):
"""Mirrors a tf.keras.layers.Dense instance with transposed weights.
Useful for decoder layers in autoencoders, to force structure and
decrease the effective number of parameters to train"""
def __init__(self, dense, output_dim, activation=None, **kwargs): def __init__(self, dense, output_dim, activation=None, **kwargs):
self.dense = dense self.dense = dense
self.output_dim = output_dim self.output_dim = output_dim
...@@ -264,6 +253,8 @@ class DenseTranspose(Layer): ...@@ -264,6 +253,8 @@ class DenseTranspose(Layer):
super().__init__(**kwargs) super().__init__(**kwargs)
def get_config(self): def get_config(self):
"""Updates Constraint metadata"""
config = super().get_config().copy() config = super().get_config().copy()
config.update( config.update(
{ {
...@@ -274,20 +265,50 @@ class DenseTranspose(Layer): ...@@ -274,20 +265,50 @@ class DenseTranspose(Layer):
) )
return config return config
# noinspection PyAttributeOutsideInit
def build(self, batch_input_shape): def build(self, batch_input_shape):
"""Updates Layer's build method"""
self.biases = self.add_weight( self.biases = self.add_weight(
name="bias", shape=[self.dense.input_shape[-1]], initializer="zeros" name="bias", shape=[self.dense.input_shape[-1]], initializer="zeros"
) )
super().build(batch_input_shape) super().build(batch_input_shape)
def call(self, inputs, **kwargs): def call(self, inputs, **kwargs):
"""Updates Layer's call method"""
z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True) z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True)
return self.activation(z + self.biases) return self.activation(z + self.biases)
def compute_output_shape(self, input_shape): def compute_output_shape(self, input_shape):
"""Outputs the transposed shape"""
return input_shape[0], self.output_dim return input_shape[0], self.output_dim
class KLDivergenceLayer(tfpl.KLDivergenceAddLoss):
def __init__(self, *args, **kwargs):
self.is_placeholder = True
super(KLDivergenceLayer, self).__init__(*args, **kwargs)
def get_config(self):
config = super().get_config().copy()
config.update(
{"is_placeholder": self.is_placeholder,}
)
return config
def call(self, distribution_a):
kl_batch = self._regularizer(distribution_a)
self.add_loss(kl_batch, inputs=[distribution_a])
self.add_metric(
kl_batch, aggregation="mean", name="kl_divergence",
)
self.add_metric(self._regularizer._weight, aggregation="mean", name="kl_rate")
return distribution_a
class MMDiscrepancyLayer(Layer): class MMDiscrepancyLayer(Layer):
""" """
Identity transform layer that adds MM Discrepancy Identity transform layer that adds MM Discrepancy
...@@ -383,7 +404,6 @@ class Dead_neuron_control(Layer): ...@@ -383,7 +404,6 @@ class Dead_neuron_control(Layer):
super(Dead_neuron_control, self).__init__(*args, **kwargs) super(Dead_neuron_control, self).__init__(*args, **kwargs)
def call(self, z, z_gauss, z_cat, **kwargs): def call(self, z, z_gauss, z_cat, **kwargs):
# Adds metric that monitors dead neurons in the latent space # Adds metric that monitors dead neurons in the latent space
self.add_metric( self.add_metric(
tf.math.zero_fraction(z_gauss), aggregation="mean", name="dead_neurons" tf.math.zero_fraction(z_gauss), aggregation="mean", name="dead_neurons"
...@@ -406,7 +426,6 @@ class Entropy_regulariser(Layer): ...@@ -406,7 +426,6 @@ class Entropy_regulariser(Layer):
config.update({"weight": self.weight}) config.update({"weight": self.weight})
def call(self, z, **kwargs): def call(self, z, **kwargs):
# axis=1 increases the entropy of a cluster across instances # axis=1 increases the entropy of a cluster across instances
# axis=0 increases the entropy of the assignment for a given instance # axis=0 increases the entropy of the assignment for a given instance
entropy = K.sum(tf.multiply(z + 1e-5, tf.math.log(z) + 1e-5), axis=1) entropy = K.sum(tf.multiply(z + 1e-5, tf.math.log(z) + 1e-5), axis=1)
......
...@@ -26,7 +26,7 @@ tf.config.experimental_run_functions_eagerly(True) ...@@ -26,7 +26,7 @@ tf.config.experimental_run_functions_eagerly(True)
@settings(deadline=None) @settings(deadline=None)
@given( @given(
shape=st.tuples( shape=st.tuples(
st.integers(min_value=2, max_value=10), st.integers(min_value=2, max_value=10) st.integers(min_value=5, max_value=10), st.integers(min_value=5, max_value=10)
) )
) )
def test_far_away_uniform_initialiser(shape): def test_far_away_uniform_initialiser(shape):
...@@ -48,7 +48,6 @@ def test_far_away_uniform_initialiser(shape): ...@@ -48,7 +48,6 @@ def test_far_away_uniform_initialiser(shape):
), ),
) )
def test_compute_mmd(tensor): def test_compute_mmd(tensor):
tensor1 = tf.cast(tf.convert_to_tensor(tensor), dtype=tf.float32) tensor1 = tf.cast(tf.convert_to_tensor(tensor), dtype=tf.float32)
tensor2 = tf.random.uniform(tensor1.shape, -300, 300, dtype=tf.float32) tensor2 = tf.random.uniform(tensor1.shape, -300, 300, dtype=tf.float32)
...@@ -60,7 +59,6 @@ def test_compute_mmd(tensor): ...@@ -60,7 +59,6 @@ def test_compute_mmd(tensor):
def test_one_cycle_scheduler(): def test_one_cycle_scheduler():
cycle1 = deepof.model_utils.one_cycle_scheduler( cycle1 = deepof.model_utils.one_cycle_scheduler(
iterations=5, max_rate=1.0, start_rate=0.1, last_iterations=2, last_rate=0.3 iterations=5, max_rate=1.0, start_rate=0.1, last_iterations=2, last_rate=0.3
) )
...@@ -86,27 +84,73 @@ def test_one_cycle_scheduler(): ...@@ -86,27 +84,73 @@ def test_one_cycle_scheduler():
assert onecycle.history["lr"][4] > onecycle.history["lr"][-1] assert onecycle.history["lr"][4] > onecycle.history["lr"][-1]
# @settings(deadline=None)
# @given()
def test_uncorrelated_features_constraint(): def test_uncorrelated_features_constraint():
pass X = np.random.uniform(0, 10, [1500, 5])
y = np.random.randint(0, 2, [1500, 1])
correlations = []
# @settings(deadline=None) for w in range(2):
# @given() test_model = tf.keras.Sequential()
# def test_mcdropout(): test_model.add(
# pass tf.keras.layers.Dense(
# 10,
# kernel_constraint=tf.keras.constraints.UnitNorm(axis=1),
# @settings(deadline=None) activity_regularizer=deepof.model_utils.uncorrelated_features_constraint(
# @given() 2, weightage=w
# def test_kldivergence_layer(): ),
# pass )
# )
#
# @settings(deadline=None) test_model.compile(
# @given() loss=tf.keras.losses.binary_crossentropy,
# def test_dense_transpose(): optimizer=tf.keras.optimizers.SGD(),
)
fit = test_model.fit(X, y, epochs=10, batch_size=100)
assert type(fit) == tf.python.keras.callbacks.History
correlations.append(np.mean(np.corrcoef(test_model.get_weights()[0])))
assert correlations[0] > correlations[1]
def test_MCDropout():
X = np.random.uniform(0, 10, [1500, 5])
y = np.random.randint(0, 2, [1500, 1])
test_model = tf.keras.Sequential()
test_model.add(tf.keras.layers.Dense(10))
test_model.add(deepof.model_utils.MCDropout(0.5))
test_model.compile(
loss=tf.keras.losses.binary_crossentropy, optimizer=tf.keras.optimizers.SGD(),
)
fit = test_model.fit(X, y, epochs=10, batch_size=100)
assert type(fit) == tf.python.keras.callbacks.History
def test_dense_transpose():
X = np.random.uniform(0, 10, [1500, 10])
y = np.random.randint(0, 2, [1500, 1])
dense_1 = tf.keras.layers.Dense(10)
dense_input = tf.keras.layers.Input(shape=(10,))
dense_test = dense_1(dense_input)
dense_tran = deepof.model_utils.DenseTranspose(dense_1, output_dim=10)(dense_test)
test_model = tf.keras.Model(dense_input, dense_tran)
test_model.compile(
loss=tf.keras.losses.binary_crossentropy, optimizer=tf.keras.optimizers.SGD(),
)
fit = test_model.fit(X, y, epochs=10, batch_size=100)
assert type(fit) == tf.python.keras.callbacks.History
# def test_KLDivergenceLayer():
# pass # pass
# #
# #
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment