hypermodels.py 9.11 KB
Newer Older
lucas_miranda's avatar
lucas_miranda committed
1
2
# @author lucasmiranda42

3
from kerastuner import HyperModel
4
from tensorflow.keras import Input, Model, Sequential
5
from tensorflow.keras.constraints import UnitNorm
6
from tensorflow.keras.layers import Bidirectional, Dense, Dropout
7
from tensorflow.keras.layers import Lambda, LSTM
8
9
10
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
11
from source.model_utils import *
12
13
14
import tensorflow as tf


15
16
class SEQ_2_SEQ_AE(HyperModel):
    def __init__(self, input_shape):
17
        super().__init__()
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        self.input_shape = input_shape

    def build(self, hp):
        # Hyperparameters to tune
        CONV_filters = hp.Int(
            "units_conv", min_value=32, max_value=256, step=32, default=256
        )
        LSTM_units_1 = hp.Int(
            "units_lstm", min_value=128, max_value=512, step=32, default=256
        )
        LSTM_units_2 = int(LSTM_units_1 / 2)
        DENSE_1 = int(LSTM_units_2)
        DENSE_2 = hp.Int(
            "units_dense1", min_value=32, max_value=256, step=32, default=64
        )
        DROPOUT_RATE = hp.Float(
            "dropout_rate", min_value=0.0, max_value=0.5, default=0.25, step=0.05
        )
        ENCODING = hp.Int(
            "units_dense2", min_value=32, max_value=128, step=32, default=32
        )

        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            input_shape=self.input_shape[1:],
        )
        Model_E1 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(DENSE_1, activation="relu", kernel_constraint=UnitNorm(axis=0))
        Model_E4 = Dense(DENSE_2, activation="relu", kernel_constraint=UnitNorm(axis=0))
        Model_E5 = Dense(
            ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
        )

        # Decoder layers
        Model_D4 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        encoder = Sequential(name="DLC_encoder")
        encoder.add(Model_E0)
        encoder.add(Model_E1)
        encoder.add(Model_E2)
        encoder.add(Model_E3)
        encoder.add(Dropout(DROPOUT_RATE))
        encoder.add(Model_E4)
        encoder.add(Model_E5)

        # Define and instanciate decoder
        decoder = Sequential(name="DLC_Decoder")
        decoder.add(
            DenseTranspose(
                Model_E5, activation="relu", input_shape=(ENCODING,), output_dim=64
            )
        )
        decoder.add(DenseTranspose(Model_E4, activation="relu", output_dim=128))
        decoder.add(DenseTranspose(Model_E3, activation="relu", output_dim=256))
        decoder.add(RepeatVector(self.input_shape[1]))
        decoder.add(Model_D4)
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

        model = Sequential([encoder, decoder], name="DLC_Autoencoder")

        model.compile(
119
            loss=Huber(reduction="sum", delta=100.0),
120
121
122
123
124
125
126
127
128
129
130
131
132
133
            optimizer=Adam(
                lr=hp.Float(
                    "learning_rate",
                    min_value=1e-4,
                    max_value=1e-2,
                    sampling="LOG",
                    default=1e-3,
                ),
                clipvalue=0.5,
            ),
            metrics=["mae"],
        )

        return model
134
135
136


class SEQ_2_SEQ_VAE(HyperModel):
137
    def __init__(self, input_shape, loss="ELBO+MMD"):
138
        super().__init__()
139
        self.input_shape = input_shape
140
141
142
143
144
        self.loss = loss

        assert self.loss in [
            "MMD",
            "ELBO",
145
            "ELBO+MMD",
146
        ], "Loss function not recognised. Select one of ELBO, MMD and ELBO+MMD"
147
148

    def build(self, hp):
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
        # Hyperparameters to tune
        CONV_filters = hp.Int(
            "units_conv", min_value=32, max_value=256, step=32, default=256
        )
        LSTM_units_1 = hp.Int(
            "units_lstm", min_value=128, max_value=512, step=32, default=256
        )
        LSTM_units_2 = int(LSTM_units_1 / 2)
        DENSE_1 = int(LSTM_units_2)
        DENSE_2 = hp.Int(
            "units_dense1", min_value=32, max_value=256, step=32, default=64
        )
        DROPOUT_RATE = hp.Float(
            "dropout_rate", min_value=0.0, max_value=0.5, default=0.25, step=0.05
        )
        ENCODING = hp.Int(
            "units_dense2", min_value=32, max_value=128, step=32, default=32
        )

        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
        )
        Model_E1 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(DENSE_1, activation="relu", kernel_constraint=UnitNorm(axis=0))
        Model_E4 = Dense(DENSE_2, activation="relu", kernel_constraint=UnitNorm(axis=0))
        Model_E5 = Dense(
            ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
        )

        # Decoder layers
        Model_D4 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = Model_E1(encoder)
        encoder = Model_E2(encoder)
        encoder = Model_E3(encoder)
        encoder = Dropout(DROPOUT_RATE)(encoder)
        encoder = Model_E4(encoder)
        encoder = Model_E5(encoder)
228

229
230
231
        z_mean = Dense(ENCODING)(encoder)
        z_log_sigma = Dense(ENCODING)(encoder)

232
233
234
        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

235
236
        z = Lambda(sampling)([z_mean, z_log_sigma])

237
238
239
        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
        # Define and instanciate decoder
        decoder = DenseTranspose(Model_E5, activation="relu", output_dim=ENCODING)(z)
        decoder = DenseTranspose(Model_E4, activation="relu", output_dim=DENSE_2)(
            decoder
        )
        decoder = DenseTranspose(Model_E3, activation="relu", output_dim=DENSE_1)(
            decoder
        )
        decoder = RepeatVector(self.input_shape[1])(decoder)
        decoder = Model_D4(decoder)
        decoder = Model_D5(decoder)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(decoder)

        # end-to-end autoencoder
        vae = Model(x, x_decoded_mean)

256
        def huber_loss(x, x_decoded_mean):
257
            huber_loss = Huber(reduction="sum", delta=100.0)
258
            return self.input_shape[1:] * huber_loss(x, x_decoded_mean)
259
260

        vae.compile(
261
            loss=huber_loss,
262
263
264
265
266
267
268
269
270
271
            optimizer=Adam(
                lr=hp.Float(
                    "learning_rate",
                    min_value=1e-4,
                    max_value=1e-2,
                    sampling="LOG",
                    default=1e-3,
                ),
            ),
            metrics=["mae"],
272
            experimental_run_tf_function=False,
273
274
275
        )

        return vae
276
277
278
279


class SEQ_2_SEQ_MVAE(HyperModel):
    def __init__(self, input_shape):
280
        super().__init__()
281
282
283
284
285
286
287
288
        self.input_shape = input_shape

    def build(self, hp):
        pass


class SEQ_2_SEQ_MMVAE(HyperModel):
    def __init__(self, input_shape):
289
        super().__init__()
290
291
292
293
        self.input_shape = input_shape

    def build(self, hp):
        pass