models.py 10.8 KB
Newer Older
1
2
3
# @author lucasmiranda42

from tensorflow.keras import Input, Model, Sequential
4
from tensorflow.keras.constraints import UnitNorm
5
from tensorflow.keras.initializers import he_uniform, Orthogonal
6
7
from tensorflow.keras.layers import BatchNormalization, Bidirectional, Dense
from tensorflow.keras.layers import Dropout, Lambda, LSTM
8
9
10
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
11
from source.model_utils import *
12
13
14
15
import tensorflow as tf


class SEQ_2_SEQ_AE:
16
17
18
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
19
20
21
22
23
24
25
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
45
            kernel_initializer=he_uniform(),
46
        )
47
        Model_E1 = Bidirectional(
48
            LSTM(
49
50
51
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
52
                kernel_constraint=UnitNorm(axis=0),
53
54
            )
        )
55
        Model_E2 = Bidirectional(
56
            LSTM(
57
58
59
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
60
                kernel_constraint=UnitNorm(axis=0),
61
62
            )
        )
63
        Model_E3 = Dense(
64
65
66
67
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
68
69
        )
        Model_E4 = Dense(
70
71
72
73
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
74
        )
75
76
77
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
78
            kernel_constraint=UnitNorm(axis=1),
79
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
80
            kernel_initializer=Orthogonal(),
81
82
83
        )

        # Decoder layers
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
        Model_D0 = DenseTranspose(
            Model_E5,
            activation="relu",
            output_dim=self.ENCODING,
        )
        Model_D1 = DenseTranspose(
            Model_E4,
            activation="relu",
            output_dim=self.DENSE_2,
        )
        Model_D2 = DenseTranspose(
            Model_E3,
            activation="relu",
            output_dim=self.DENSE_1,
        )
99
        Model_D3 = RepeatVector(self.input_shape[1])
100
        Model_D4 = Bidirectional(
101
            LSTM(
102
103
104
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
105
                kernel_constraint=UnitNorm(axis=1),
106
107
            )
        )
108
        Model_D5 = Bidirectional(
109
            LSTM(
110
111
112
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
113
                kernel_constraint=UnitNorm(axis=1),
114
115
116
117
            )
        )

        # Define and instanciate encoder
lucas_miranda's avatar
lucas_miranda committed
118
        encoder = Sequential(name="SEQ_2_SEQ_Encoder")
119
        encoder.add(Input(shape=self.input_shape[1:]))
120
        encoder.add(Model_E0)
121
        encoder.add(BatchNormalization())
122
        encoder.add(Model_E1)
123
        encoder.add(BatchNormalization())
124
        encoder.add(Model_E2)
125
        encoder.add(BatchNormalization())
126
        encoder.add(Model_E3)
127
        encoder.add(BatchNormalization())
128
129
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
130
        encoder.add(BatchNormalization())
131
132
133
        encoder.add(Model_E5)

        # Define and instanciate decoder
lucas_miranda's avatar
lucas_miranda committed
134
        decoder = Sequential(name="SEQ_2_SEQ_Decoder")
135
        decoder.add(Model_D0)
136
        encoder.add(BatchNormalization())
137
        decoder.add(Model_D1)
138
        encoder.add(BatchNormalization())
139
        decoder.add(Model_D2)
140
        encoder.add(BatchNormalization())
141
        decoder.add(Model_D3)
142
        decoder.add(BatchNormalization())
143
        decoder.add(Model_D4)
144
        encoder.add(BatchNormalization())
145
146
147
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

lucas_miranda's avatar
lucas_miranda committed
148
        model = Sequential([encoder, decoder], name="SEQ_2_SEQ_AE")
149
150
151

        model.compile(
            loss=Huber(reduction="sum", delta=100.0),
152
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
153
154
155
            metrics=["mae"],
        )

lucas_miranda's avatar
lucas_miranda committed
156
        return encoder, decoder, model
157
158
159


class SEQ_2_SEQ_VAE:
160
161
162
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
163
164
165
166
167
168
169
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
170
        loss="ELBO+MMD",
171
172
173
174
175
176
177
178
179
180
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
181
        self.loss = loss
182
183
184
185

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
186
            filters=self.CONV_filters,
187
188
189
190
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
191
            kernel_initializer=he_uniform(),
192
        )
193
        Model_E1 = Bidirectional(
194
            LSTM(
195
                self.LSTM_units_1,
196
197
                activation="tanh",
                return_sequences=True,
198
                kernel_constraint=UnitNorm(axis=0),
199
200
            )
        )
201
        Model_E2 = Bidirectional(
202
            LSTM(
203
                self.LSTM_units_2,
204
205
                activation="tanh",
                return_sequences=False,
206
                kernel_constraint=UnitNorm(axis=0),
207
208
            )
        )
209
        Model_E3 = Dense(
210
211
212
213
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
214
215
        )
        Model_E4 = Dense(
216
217
218
219
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
220
        )
221
        Model_E5 = Dense(
222
            self.ENCODING,
223
            activation="relu",
224
            kernel_constraint=UnitNorm(axis=1),
225
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
226
            kernel_initializer=Orthogonal(),
227
228
229
        )

        # Decoder layers
lucas_miranda's avatar
lucas_miranda committed
230

231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
        Model_D0 = DenseTranspose(
            Model_E5,
            activation="relu",
            output_dim=self.ENCODING,
        )
        Model_D1 = DenseTranspose(
            Model_E4,
            activation="relu",
            output_dim=self.DENSE_2,
        )
        Model_D2 = DenseTranspose(
            Model_E3,
            activation="relu",
            output_dim=self.DENSE_1,
        )
lucas_miranda's avatar
lucas_miranda committed
246
        Model_D3 = RepeatVector(self.input_shape[1])
247
        Model_D4 = Bidirectional(
248
            LSTM(
249
                self.LSTM_units_1,
250
251
                activation="tanh",
                return_sequences=True,
252
                kernel_constraint=UnitNorm(axis=1),
253
254
            )
        )
255
        Model_D5 = Bidirectional(
256
            LSTM(
257
                self.LSTM_units_1,
258
259
                activation="sigmoid",
                return_sequences=True,
260
                kernel_constraint=UnitNorm(axis=1),
261
262
263
264
265
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
266
        encoder = Model_E0(x)
267
        encoder = BatchNormalization()(encoder)
268
        encoder = Model_E1(encoder)
269
        encoder = BatchNormalization()(encoder)
270
        encoder = Model_E2(encoder)
271
        encoder = BatchNormalization()(encoder)
272
        encoder = Model_E3(encoder)
273
        encoder = BatchNormalization()(encoder)
274
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
275
        encoder = Model_E4(encoder)
276
        encoder = BatchNormalization()(encoder)
277
278
        encoder = Model_E5(encoder)

279
280
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
281
282
283
284
285
286
287
288
289

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

lucas_miranda's avatar
lucas_miranda committed
290
291
        # Define and instanciate generator
        generator = Model_D0(z)
292
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
293
        generator = Model_D1(generator)
294
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
295
        generator = Model_D2(generator)
296
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
297
        generator = Model_D3(generator)
298
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
299
        generator = Model_D4(generator)
300
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
301
302
        generator = Model_D5(generator)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)
303

304
        # end-to-end autoencoder
lucas_miranda's avatar
lucas_miranda committed
305
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
306
        vae = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")
lucas_miranda's avatar
lucas_miranda committed
307

308
309
310
        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
311
        _generator = BatchNormalization()(_generator)
312
        _generator = Model_D1(_generator)
313
        _generator = BatchNormalization()(_generator)
314
        _generator = Model_D2(_generator)
315
        _generator = BatchNormalization()(_generator)
316
        _generator = Model_D3(_generator)
317
        _generator = BatchNormalization()(_generator)
318
        _generator = Model_D4(_generator)
319
        _generator = BatchNormalization()(_generator)
320
321
322
        _generator = Model_D5(_generator)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")
323
324
325
326
327
328
329

        def huber_loss(x, x_decoded_mean):
            huber_loss = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber_loss(x, x_decoded_mean)

        vae.compile(
            loss=huber_loss,
lucas_miranda's avatar
lucas_miranda committed
330
            optimizer=Adam(lr=self.learn_rate,),
331
332
333
334
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

335
        return encoder, generator, vae
336
337


338
class SEQ_2_SEQ_VAME:
339
340
341
    pass


342
class SEQ_2_SEQ_MMVAE:
343
    pass
lucas_miranda's avatar
lucas_miranda committed
344

345
# TODO next:
lucas_miranda's avatar
lucas_miranda committed
346
#      - VAE loss function (though this should be analysed later on taking the encodings into account)
347
#      - Smaller input sliding window (10-15 frames)