models.py 17.3 KB
Newer Older
1
2
3
# @author lucasmiranda42

from tensorflow.keras import Input, Model, Sequential
4
from tensorflow.keras.constraints import UnitNorm
5
from tensorflow.keras.initializers import he_uniform, Orthogonal
6
7
from tensorflow.keras.layers import BatchNormalization, Bidirectional, Dense
from tensorflow.keras.layers import Dropout, Lambda, LSTM
8
9
10
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
11
from source.model_utils import *
12
13
14
15
import tensorflow as tf


class SEQ_2_SEQ_AE:
16
17
18
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
19
20
21
22
23
24
25
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
45
            kernel_initializer=he_uniform(),
46
        )
47
        Model_E1 = Bidirectional(
48
            LSTM(
49
50
51
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
52
                kernel_constraint=UnitNorm(axis=0),
53
54
            )
        )
55
        Model_E2 = Bidirectional(
56
            LSTM(
57
58
59
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
60
                kernel_constraint=UnitNorm(axis=0),
61
62
            )
        )
63
        Model_E3 = Dense(
64
65
66
67
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
68
69
        )
        Model_E4 = Dense(
70
71
72
73
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
74
        )
75
76
77
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
78
            kernel_constraint=UnitNorm(axis=1),
79
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
80
            kernel_initializer=Orthogonal(),
81
82
83
        )

        # Decoder layers
84
        Model_D0 = DenseTranspose(
85
            Model_E5, activation="relu", output_dim=self.ENCODING,
86
        )
87
88
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
89
        Model_D3 = RepeatVector(self.input_shape[1])
90
        Model_D4 = Bidirectional(
91
            LSTM(
92
93
94
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
95
                kernel_constraint=UnitNorm(axis=1),
96
97
            )
        )
98
        Model_D5 = Bidirectional(
99
            LSTM(
100
101
102
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
103
                kernel_constraint=UnitNorm(axis=1),
104
105
106
107
            )
        )

        # Define and instanciate encoder
lucas_miranda's avatar
lucas_miranda committed
108
        encoder = Sequential(name="SEQ_2_SEQ_Encoder")
109
        encoder.add(Input(shape=self.input_shape[1:]))
110
        encoder.add(Model_E0)
111
        encoder.add(BatchNormalization())
112
        encoder.add(Model_E1)
113
        encoder.add(BatchNormalization())
114
        encoder.add(Model_E2)
115
        encoder.add(BatchNormalization())
116
        encoder.add(Model_E3)
117
        encoder.add(BatchNormalization())
118
119
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
120
        encoder.add(BatchNormalization())
121
122
123
        encoder.add(Model_E5)

        # Define and instanciate decoder
lucas_miranda's avatar
lucas_miranda committed
124
        decoder = Sequential(name="SEQ_2_SEQ_Decoder")
125
        decoder.add(Model_D0)
126
        encoder.add(BatchNormalization())
127
        decoder.add(Model_D1)
128
        encoder.add(BatchNormalization())
129
        decoder.add(Model_D2)
130
        encoder.add(BatchNormalization())
131
        decoder.add(Model_D3)
132
        decoder.add(BatchNormalization())
133
        decoder.add(Model_D4)
134
        encoder.add(BatchNormalization())
135
136
137
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

lucas_miranda's avatar
lucas_miranda committed
138
        model = Sequential([encoder, decoder], name="SEQ_2_SEQ_AE")
139
140
141

        model.compile(
            loss=Huber(reduction="sum", delta=100.0),
142
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
143
144
145
            metrics=["mae"],
        )

lucas_miranda's avatar
lucas_miranda committed
146
        return encoder, decoder, model
147
148
149


class SEQ_2_SEQ_VAE:
150
151
152
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
153
154
155
156
157
158
159
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
160
        loss="ELBO+MMD",
161
162
163
164
165
166
167
168
169
170
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
171
        self.loss = loss
172
173
174
175

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
176
            filters=self.CONV_filters,
177
178
179
180
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
181
            kernel_initializer=he_uniform(),
182
        )
183
        Model_E1 = Bidirectional(
184
            LSTM(
185
                self.LSTM_units_1,
186
187
                activation="tanh",
                return_sequences=True,
188
                kernel_constraint=UnitNorm(axis=0),
189
190
            )
        )
191
        Model_E2 = Bidirectional(
192
            LSTM(
193
                self.LSTM_units_2,
194
195
                activation="tanh",
                return_sequences=False,
196
                kernel_constraint=UnitNorm(axis=0),
197
198
            )
        )
199
        Model_E3 = Dense(
200
201
202
203
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
204
205
        )
        Model_E4 = Dense(
206
207
208
209
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
210
        )
211
        Model_E5 = Dense(
212
            self.ENCODING,
213
            activation="relu",
214
            kernel_constraint=UnitNorm(axis=1),
215
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
216
            kernel_initializer=Orthogonal(),
217
218
219
        )

        # Decoder layers
lucas_miranda's avatar
lucas_miranda committed
220

221
        Model_D0 = DenseTranspose(
222
            Model_E5, activation="relu", output_dim=self.ENCODING,
223
        )
224
225
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
lucas_miranda's avatar
lucas_miranda committed
226
        Model_D3 = RepeatVector(self.input_shape[1])
227
        Model_D4 = Bidirectional(
228
            LSTM(
229
                self.LSTM_units_1,
230
231
                activation="tanh",
                return_sequences=True,
232
                kernel_constraint=UnitNorm(axis=1),
233
234
            )
        )
235
        Model_D5 = Bidirectional(
236
            LSTM(
237
                self.LSTM_units_1,
238
239
                activation="sigmoid",
                return_sequences=True,
240
                kernel_constraint=UnitNorm(axis=1),
241
242
243
244
245
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
246
        encoder = Model_E0(x)
247
        encoder = BatchNormalization()(encoder)
248
        encoder = Model_E1(encoder)
249
        encoder = BatchNormalization()(encoder)
250
        encoder = Model_E2(encoder)
251
        encoder = BatchNormalization()(encoder)
252
        encoder = Model_E3(encoder)
253
        encoder = BatchNormalization()(encoder)
254
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
255
        encoder = Model_E4(encoder)
256
        encoder = BatchNormalization()(encoder)
257
258
        encoder = Model_E5(encoder)

259
260
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
261
262
263
264
265
266
267
268
269

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

lucas_miranda's avatar
lucas_miranda committed
270
271
        # Define and instanciate generator
        generator = Model_D0(z)
272
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
273
        generator = Model_D1(generator)
274
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
275
        generator = Model_D2(generator)
276
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
277
        generator = Model_D3(generator)
278
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
279
        generator = Model_D4(generator)
280
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
281
282
        generator = Model_D5(generator)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)
283

284
        # end-to-end autoencoder
lucas_miranda's avatar
lucas_miranda committed
285
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
286
        vae = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")
lucas_miranda's avatar
lucas_miranda committed
287

288
289
290
        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
291
        _generator = BatchNormalization()(_generator)
292
        _generator = Model_D1(_generator)
293
        _generator = BatchNormalization()(_generator)
294
        _generator = Model_D2(_generator)
295
        _generator = BatchNormalization()(_generator)
296
        _generator = Model_D3(_generator)
297
        _generator = BatchNormalization()(_generator)
298
        _generator = Model_D4(_generator)
299
        _generator = BatchNormalization()(_generator)
300
301
302
        _generator = Model_D5(_generator)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")
303

304
305
306
        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)
307
308
309

        vae.compile(
            loss=huber_loss,
lucas_miranda's avatar
lucas_miranda committed
310
            optimizer=Adam(lr=self.learn_rate,),
311
312
313
314
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

315
        return encoder, generator, vae
316
317


318
319
class SEQ_2_SEQ_VAEP:
    def __init__(
320
321
322
323
324
325
326
327
328
329
        self,
        input_shape,
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
        loss="ELBO+MMD",
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
        self.loss = loss

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            kernel_initializer=he_uniform(),
        )
        Model_E1 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E4 = Dense(
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
            kernel_initializer=Orthogonal(),
        )

        # Decoder layers
389
390
391
392
393
        Model_B1 = BatchNormalization()
        Model_B2 = BatchNormalization()
        Model_B3 = BatchNormalization()
        Model_B4 = BatchNormalization()
        Model_B5 = BatchNormalization()
394
395
396
        Model_D0 = DenseTranspose(
            Model_E5, activation="relu", output_dim=self.ENCODING,
        )
397
398
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
        Model_D3 = RepeatVector(self.input_shape[1])
        Model_D4 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E1(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E2(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E3(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
        encoder = Model_E4(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E5(encoder)

        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

        # Define and instanciate generator
        generator = Model_D0(z)
445
        generator = Model_B1(generator)
446
        generator = Model_D1(generator)
447
        generator = Model_B2(generator)
448
        generator = Model_D2(generator)
449
        generator = Model_B3(generator)
450
        generator = Model_D3(generator)
451
        generator = Model_B4(generator)
452
        generator = Model_D4(generator)
453
        generator = Model_B5(generator)
454
455
456
        generator = Model_D5(generator)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)

457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
        # Define and instanciate predictor
        predictor = Model_D0(z)
        predictor = Model_B1(predictor)
        predictor = Model_D1(predictor)
        predictor = Model_B2(predictor)
        predictor = Model_D2(predictor)
        predictor = Model_B3(predictor)
        predictor = Model_D3(predictor)
        predictor = Model_B4(predictor)
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
        predictor = BatchNormalization()(predictor)
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
        x_predicted_mean = TimeDistributed(Dense(self.input_shape[2]))(predictor)

485
486
        # end-to-end autoencoder
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
487
488
489
        vaep = Model(
            inputs=x, outputs=[x_decoded_mean, x_predicted_mean], name="SEQ_2_SEQ_VAE"
        )
490
491
492
493

        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
494
        _generator = Model_B1(_generator)
495
        _generator = Model_D1(_generator)
496
        _generator = Model_B2(_generator)
497
        _generator = Model_D2(_generator)
498
        _generator = Model_B3(_generator)
499
        _generator = Model_D3(_generator)
500
        _generator = Model_B4(_generator)
501
        _generator = Model_D4(_generator)
502
        _generator = Model_B5(_generator)
503
504
505
506
507
508
509
510
511
512
        _generator = Model_D5(_generator)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")

        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)

        vaep.compile(
            loss=huber_loss,
513
            optimizer=Adam(lr=self.learn_rate,),
514
515
516
517
518
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

        return encoder, generator, vaep
519
520


521
class SEQ_2_SEQ_MMVAE:
522
    pass
lucas_miranda's avatar
lucas_miranda committed
523

524

525
# TODO next:
526
#      - MERGE BatchNormalization layers in generator and _generator in SEQ_2_SEQ_VAE
lucas_miranda's avatar
lucas_miranda committed
527
#      - VAE loss function (though this should be analysed later on taking the encodings into account)
528
#      - Smaller input sliding window (10-15 frames)