models.py 19.8 KB
Newer Older
1
2
# @author lucasmiranda42

3
from tensorflow.keras import backend as K
4
from tensorflow.keras import Input, Model, Sequential
5
from tensorflow.keras.callbacks import LambdaCallback
6
from tensorflow.keras.constraints import UnitNorm
7
from tensorflow.keras.initializers import he_uniform, Orthogonal
8
9
from tensorflow.keras.layers import BatchNormalization, Bidirectional, Dense
from tensorflow.keras.layers import Dropout, Lambda, LSTM
10
from tensorflow.keras.layers import RepeatVector, TimeDistributed
11
from tensorflow.keras.losses import Huber
12
from tensorflow.keras.optimizers import Adam
13
from source.model_utils import *
14
15
16
17
import tensorflow as tf


class SEQ_2_SEQ_AE:
18
19
20
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
21
22
23
24
25
26
27
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
47
            kernel_initializer=he_uniform(),
48
        )
49
        Model_E1 = Bidirectional(
50
            LSTM(
51
52
53
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
54
                kernel_constraint=UnitNorm(axis=0),
55
56
            )
        )
57
        Model_E2 = Bidirectional(
58
            LSTM(
59
60
61
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
62
                kernel_constraint=UnitNorm(axis=0),
63
64
            )
        )
65
        Model_E3 = Dense(
66
67
68
69
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
70
71
        )
        Model_E4 = Dense(
72
73
74
75
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
76
        )
77
78
79
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
80
            kernel_constraint=UnitNorm(axis=1),
81
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
82
            kernel_initializer=Orthogonal(),
83
84
85
        )

        # Decoder layers
86
        Model_D0 = DenseTranspose(
87
            Model_E5, activation="relu", output_dim=self.ENCODING,
88
        )
89
90
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
91
        Model_D3 = RepeatVector(self.input_shape[1])
92
        Model_D4 = Bidirectional(
93
            LSTM(
94
95
96
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
97
                kernel_constraint=UnitNorm(axis=1),
98
99
            )
        )
100
        Model_D5 = Bidirectional(
101
            LSTM(
102
103
104
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
105
                kernel_constraint=UnitNorm(axis=1),
106
107
108
            )
        )

109
        # Define and instantiate encoder
lucas_miranda's avatar
lucas_miranda committed
110
        encoder = Sequential(name="SEQ_2_SEQ_Encoder")
111
        encoder.add(Input(shape=self.input_shape[1:]))
112
        encoder.add(Model_E0)
113
        encoder.add(BatchNormalization())
114
        encoder.add(Model_E1)
115
        encoder.add(BatchNormalization())
116
        encoder.add(Model_E2)
117
        encoder.add(BatchNormalization())
118
        encoder.add(Model_E3)
119
        encoder.add(BatchNormalization())
120
121
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
122
        encoder.add(BatchNormalization())
123
124
        encoder.add(Model_E5)

125
        # Define and instantiate decoder
lucas_miranda's avatar
lucas_miranda committed
126
        decoder = Sequential(name="SEQ_2_SEQ_Decoder")
127
        decoder.add(Model_D0)
128
        encoder.add(BatchNormalization())
129
        decoder.add(Model_D1)
130
        encoder.add(BatchNormalization())
131
        decoder.add(Model_D2)
132
        encoder.add(BatchNormalization())
133
        decoder.add(Model_D3)
134
        decoder.add(Model_D4)
135
        encoder.add(BatchNormalization())
136
137
138
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

lucas_miranda's avatar
lucas_miranda committed
139
        model = Sequential([encoder, decoder], name="SEQ_2_SEQ_AE")
140
141

        model.compile(
142
            loss=Huber(reduction="sum", delta=100.0),
143
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
144
145
146
            metrics=["mae"],
        )

lucas_miranda's avatar
lucas_miranda committed
147
        return encoder, decoder, model
148
149
150


class SEQ_2_SEQ_VAE:
151
152
153
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
154
155
156
157
158
159
160
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
161
        loss="ELBO+MMD",
162
163
        kl_warmup_epochs=0,
        mmd_warmup_epochs=0,
164
165
166
167
168
169
170
171
172
173
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
174
        self.loss = loss
175
176
        self.kl_warmup = kl_warmup_epochs
        self.mmd_warmup = mmd_warmup_epochs
177

178
179
180
181
        assert (
            "ELBO" in self.loss or "MMD" in self.loss
        ), "loss must be one of ELBO, MMD or ELBO+MMD (default)"

182
183
184
    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
185
            filters=self.CONV_filters,
186
187
188
189
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
190
            kernel_initializer=he_uniform(),
191
        )
192
        Model_E1 = Bidirectional(
193
            LSTM(
194
                self.LSTM_units_1,
195
196
                activation="tanh",
                return_sequences=True,
197
                kernel_constraint=UnitNorm(axis=0),
198
199
            )
        )
200
        Model_E2 = Bidirectional(
201
            LSTM(
202
                self.LSTM_units_2,
203
204
                activation="tanh",
                return_sequences=False,
205
                kernel_constraint=UnitNorm(axis=0),
206
207
            )
        )
208
        Model_E3 = Dense(
209
210
211
212
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
213
214
        )
        Model_E4 = Dense(
215
216
217
218
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
219
        )
220
        Model_E5 = Dense(
221
            self.ENCODING,
222
            activation="relu",
223
            kernel_constraint=UnitNorm(axis=1),
224
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
225
            kernel_initializer=Orthogonal(),
226
227
228
        )

        # Decoder layers
229
230
231
232
233
        Model_B1 = BatchNormalization()
        Model_B2 = BatchNormalization()
        Model_B3 = BatchNormalization()
        Model_B4 = BatchNormalization()
        Model_B5 = BatchNormalization()
234
        Model_D0 = DenseTranspose(
235
            Model_E5, activation="relu", output_dim=self.ENCODING,
236
        )
237
238
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
lucas_miranda's avatar
lucas_miranda committed
239
        Model_D3 = RepeatVector(self.input_shape[1])
240
        Model_D4 = Bidirectional(
241
            LSTM(
242
                self.LSTM_units_1,
243
244
                activation="tanh",
                return_sequences=True,
245
                kernel_constraint=UnitNorm(axis=1),
246
247
            )
        )
248
        Model_D5 = Bidirectional(
249
            LSTM(
250
                self.LSTM_units_1,
251
252
                activation="sigmoid",
                return_sequences=True,
253
                kernel_constraint=UnitNorm(axis=1),
254
255
256
            )
        )

257
        # Define and instantiate encoder
258
        x = Input(shape=self.input_shape[1:])
259
        encoder = Model_E0(x)
260
        encoder = BatchNormalization()(encoder)
261
        encoder = Model_E1(encoder)
262
        encoder = BatchNormalization()(encoder)
263
        encoder = Model_E2(encoder)
264
        encoder = BatchNormalization()(encoder)
265
        encoder = Model_E3(encoder)
266
        encoder = BatchNormalization()(encoder)
267
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
268
        encoder = Model_E4(encoder)
269
        encoder = BatchNormalization()(encoder)
270
271
        encoder = Model_E5(encoder)

272
273
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
274

275
        # Define and control custom loss functions
276
        kl_wu = False
277
        if "ELBO" in self.loss:
278
279
280
281
282

            kl_beta = 1
            if self.kl_warmup:

                def klwarmup(epoch):
283
                    kl_beta = K.min([epoch / self.kl_warmup, 1])
284
285
286
287

                kl_wu = LambdaCallback(on_epoch_end=lambda epoch, log: klwarmup(epoch))

            z_mean, z_log_sigma = KLDivergenceLayer(beta=kl_beta)([z_mean, z_log_sigma])
288
289
290

        z = Lambda(sampling)([z_mean, z_log_sigma])

291
        mmd_wu = False
292
        if "MMD" in self.loss:
293
294
295

            mmd_beta = 1
            if self.kl_warmup:
296

297
                def mmdwarmup(epoch):
298
                    mmd_beta = K.min([epoch / self.mmd_warmup, 1])
299

300
301
302
                mmd_wu = LambdaCallback(
                    on_epoch_end=lambda epoch, log: mmdwarmup(epoch)
                )
303
304

            z = MMDiscrepancyLayer(beta=mmd_beta)(z)
305

306
        # Define and instantiate generator
lucas_miranda's avatar
lucas_miranda committed
307
        generator = Model_D0(z)
308
        generator = Model_B1(generator)
lucas_miranda's avatar
lucas_miranda committed
309
        generator = Model_D1(generator)
310
        generator = Model_B2(generator)
lucas_miranda's avatar
lucas_miranda committed
311
        generator = Model_D2(generator)
312
        generator = Model_B3(generator)
lucas_miranda's avatar
lucas_miranda committed
313
314
        generator = Model_D3(generator)
        generator = Model_D4(generator)
315
        generator = Model_B4(generator)
lucas_miranda's avatar
lucas_miranda committed
316
        generator = Model_D5(generator)
317
        generator = Model_B5(generator)
lucas_miranda's avatar
lucas_miranda committed
318
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)
319

320
        # end-to-end autoencoder
lucas_miranda's avatar
lucas_miranda committed
321
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
322
        vae = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")
lucas_miranda's avatar
lucas_miranda committed
323

324
325
326
        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
327
        _generator = Model_B1(_generator)
328
        _generator = Model_D1(_generator)
329
        _generator = Model_B2(_generator)
330
        _generator = Model_D2(_generator)
331
        _generator = Model_B3(_generator)
332
333
        _generator = Model_D3(_generator)
        _generator = Model_D4(_generator)
334
        _generator = Model_B4(_generator)
335
        _generator = Model_D5(_generator)
336
        _generator = Model_B5(_generator)
337
338
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")
339

340
341
342
        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)
343
344

        vae.compile(
345
            loss=huber_loss,
lucas_miranda's avatar
lucas_miranda committed
346
            optimizer=Adam(lr=self.learn_rate,),
347
348
349
350
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

351
        return encoder, generator, vae, kl_wu, mmd_wu
352
353


354
355
class SEQ_2_SEQ_VAEP:
    def __init__(
356
357
358
359
360
361
362
363
364
365
        self,
        input_shape,
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
        loss="ELBO+MMD",
366
367
        kl_warmup_epochs=0,
        mmd_warmup_epochs=0,
368
369
370
371
372
373
374
375
376
377
378
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
        self.loss = loss
379
380
        self.kl_warmup = kl_warmup_epochs
        self.mmd_warmup = mmd_warmup_epochs
381

382
383
384
385
        assert (
            "ELBO" in self.loss or "MMD" in self.loss
        ), "loss must be one of ELBO, MMD or ELBO+MMD (default)"

386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            kernel_initializer=he_uniform(),
        )
        Model_E1 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E4 = Dense(
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
            kernel_initializer=Orthogonal(),
        )

        # Decoder layers
433
434
435
436
437
        Model_B1 = BatchNormalization()
        Model_B2 = BatchNormalization()
        Model_B3 = BatchNormalization()
        Model_B4 = BatchNormalization()
        Model_B5 = BatchNormalization()
438
439
440
        Model_D0 = DenseTranspose(
            Model_E5, activation="relu", output_dim=self.ENCODING,
        )
441
442
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
        Model_D3 = RepeatVector(self.input_shape[1])
        Model_D4 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

461
        # Define and instantiate encoder
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E1(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E2(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E3(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
        encoder = Model_E4(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E5(encoder)

        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)

479
480
        # Define and control custom loss functions
        kl_wu = False
481
        if "ELBO" in self.loss:
482
483
484
485
486

            kl_beta = 1
            if self.kl_warmup:

                def klwarmup(epoch):
487
                    kl_beta = K.min([epoch / self.kl_warmup, 1])
488

489
490
491
                kl_wu = LambdaCallback(
                    on_epoch_begin=lambda epoch, log: klwarmup(epoch)
                )
492
493

            z_mean, z_log_sigma = KLDivergenceLayer(beta=kl_beta)([z_mean, z_log_sigma])
494
495
496

        z = Lambda(sampling)([z_mean, z_log_sigma])

497
        mmd_wu = False
498
        if "MMD" in self.loss:
499
500
501
502
503

            mmd_beta = 1
            if self.kl_warmup:

                def mmdwarmup(epoch):
504
                    mmd_beta = K.min([epoch / self.mmd_warmup, 1])
505
506

                mmd_wu = LambdaCallback(
507
                    on_epoch_begin=lambda epoch, log: mmdwarmup(epoch)
508
509
510
                )

            z = MMDiscrepancyLayer(beta=mmd_beta)(z)
511

512
        # Define and instantiate generator
513
        generator = Model_D0(z)
514
        generator = Model_B1(generator)
515
        generator = Model_D1(generator)
516
        generator = Model_B2(generator)
517
        generator = Model_D2(generator)
518
        generator = Model_B3(generator)
519
520
        generator = Model_D3(generator)
        generator = Model_D4(generator)
521
        generator = Model_B4(generator)
522
        generator = Model_D5(generator)
523
        generator = Model_B5(generator)
524
525
526
        x_decoded_mean = TimeDistributed(
            Dense(self.input_shape[2]), name="vaep_reconstruction"
        )(generator)
527

528
        # Define and instantiate predictor
529
530
531
        predictor = Dense(
            self.ENCODING, activation="relu", kernel_initializer=he_uniform()
        )(z)
532
        predictor = BatchNormalization()(predictor)
533
534
535
        predictor = Dense(
            self.DENSE_2, activation="relu", kernel_initializer=he_uniform()
        )(predictor)
536
        predictor = BatchNormalization()(predictor)
537
538
539
        predictor = Dense(
            self.DENSE_1, activation="relu", kernel_initializer=he_uniform()
        )(predictor)
540
        predictor = BatchNormalization()(predictor)
541
        predictor = RepeatVector(self.input_shape[1])(predictor)
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
        predictor = BatchNormalization()(predictor)
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
559
        predictor = BatchNormalization()(predictor)
560
561
562
        x_predicted_mean = TimeDistributed(
            Dense(self.input_shape[2]), name="vaep_prediction"
        )(predictor)
563

564
565
        # end-to-end autoencoder
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
566
567
568
        vaep = Model(
            inputs=x, outputs=[x_decoded_mean, x_predicted_mean], name="SEQ_2_SEQ_VAE"
        )
569
570
571
572

        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
573
        _generator = Model_B1(_generator)
574
        _generator = Model_D1(_generator)
575
        _generator = Model_B2(_generator)
576
        _generator = Model_D2(_generator)
577
        _generator = Model_B3(_generator)
578
579
        _generator = Model_D3(_generator)
        _generator = Model_D4(_generator)
580
        _generator = Model_B4(_generator)
581
        _generator = Model_D5(_generator)
582
        _generator = Model_B5(_generator)
583
584
585
586
587
588
589
590
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")

        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)

        vaep.compile(
591
            loss=huber_loss,
592
            optimizer=Adam(lr=self.learn_rate,),
593
594
595
596
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

597
        return encoder, generator, vaep, kl_wu, mmd_wu
598
599


600
class SEQ_2_SEQ_MMVAE:
601
    pass
lucas_miranda's avatar
lucas_miranda committed
602

603

604
605
606
607
608
# TODO:
#       - Gaussian Mixture + Categorical priors -> Deep Clustering
#       - free bits paper
#       - Attention mechanism for encoder / decoder (does it make sense?)
#       - Transformer encoder/decoder (does it make sense?)