models.py 17.8 KB
Newer Older
1
2
3
# @author lucasmiranda42

from tensorflow.keras import Input, Model, Sequential
4
from tensorflow.keras.constraints import UnitNorm
5
from tensorflow.keras.initializers import he_uniform, Orthogonal
6
7
from tensorflow.keras.layers import BatchNormalization, Bidirectional, Dense
from tensorflow.keras.layers import Dropout, Lambda, LSTM
8
from tensorflow.keras.layers import RepeatVector, TimeDistributed
9
from tensorflow.keras.losses import BinaryCrossentropy, Huber
10
from tensorflow.keras.optimizers import Adam
11
from source.model_utils import *
12
13
14
15
import tensorflow as tf


class SEQ_2_SEQ_AE:
16
17
18
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
19
20
21
22
23
24
25
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
45
            kernel_initializer=he_uniform(),
46
        )
47
        Model_E1 = Bidirectional(
48
            LSTM(
49
50
51
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
52
                kernel_constraint=UnitNorm(axis=0),
53
54
            )
        )
55
        Model_E2 = Bidirectional(
56
            LSTM(
57
58
59
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
60
                kernel_constraint=UnitNorm(axis=0),
61
62
            )
        )
63
        Model_E3 = Dense(
64
65
66
67
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
68
69
        )
        Model_E4 = Dense(
70
71
72
73
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
74
        )
75
76
77
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
78
            kernel_constraint=UnitNorm(axis=1),
79
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
80
            kernel_initializer=Orthogonal(),
81
82
83
        )

        # Decoder layers
84
        Model_D0 = DenseTranspose(
85
            Model_E5, activation="relu", output_dim=self.ENCODING,
86
        )
87
88
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
89
        Model_D3 = RepeatVector(self.input_shape[1])
90
        Model_D4 = Bidirectional(
91
            LSTM(
92
93
94
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
95
                kernel_constraint=UnitNorm(axis=1),
96
97
            )
        )
98
        Model_D5 = Bidirectional(
99
            LSTM(
100
101
102
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
103
                kernel_constraint=UnitNorm(axis=1),
104
105
106
            )
        )

107
        # Define and instantiate encoder
lucas_miranda's avatar
lucas_miranda committed
108
        encoder = Sequential(name="SEQ_2_SEQ_Encoder")
109
        encoder.add(Input(shape=self.input_shape[1:]))
110
        encoder.add(Model_E0)
111
        encoder.add(BatchNormalization())
112
        encoder.add(Model_E1)
113
        encoder.add(BatchNormalization())
114
        encoder.add(Model_E2)
115
        encoder.add(BatchNormalization())
116
        encoder.add(Model_E3)
117
        encoder.add(BatchNormalization())
118
119
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
120
        encoder.add(BatchNormalization())
121
122
        encoder.add(Model_E5)

123
        # Define and instantiate decoder
lucas_miranda's avatar
lucas_miranda committed
124
        decoder = Sequential(name="SEQ_2_SEQ_Decoder")
125
        decoder.add(Model_D0)
126
        encoder.add(BatchNormalization())
127
        decoder.add(Model_D1)
128
        encoder.add(BatchNormalization())
129
        decoder.add(Model_D2)
130
        encoder.add(BatchNormalization())
131
        decoder.add(Model_D3)
132
        decoder.add(Model_D4)
133
        encoder.add(BatchNormalization())
134
135
136
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

lucas_miranda's avatar
lucas_miranda committed
137
        model = Sequential([encoder, decoder], name="SEQ_2_SEQ_AE")
138
139

        model.compile(
140
            loss="binary_crossentropy",#Huber(reduction="sum", delta=100.0),
141
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
142
143
144
            metrics=["mae"],
        )

lucas_miranda's avatar
lucas_miranda committed
145
        return encoder, decoder, model
146
147
148


class SEQ_2_SEQ_VAE:
149
150
151
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
152
153
154
155
156
157
158
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
159
        loss="ELBO+MMD",
160
161
162
163
164
165
166
167
168
169
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
170
        self.loss = loss
171
172
173
174

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
175
            filters=self.CONV_filters,
176
177
178
179
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
180
            kernel_initializer=he_uniform(),
181
        )
182
        Model_E1 = Bidirectional(
183
            LSTM(
184
                self.LSTM_units_1,
185
186
                activation="tanh",
                return_sequences=True,
187
                kernel_constraint=UnitNorm(axis=0),
188
189
            )
        )
190
        Model_E2 = Bidirectional(
191
            LSTM(
192
                self.LSTM_units_2,
193
194
                activation="tanh",
                return_sequences=False,
195
                kernel_constraint=UnitNorm(axis=0),
196
197
            )
        )
198
        Model_E3 = Dense(
199
200
201
202
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
203
204
        )
        Model_E4 = Dense(
205
206
207
208
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
209
        )
210
        Model_E5 = Dense(
211
            self.ENCODING,
212
            activation="relu",
213
            kernel_constraint=UnitNorm(axis=1),
214
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
215
            kernel_initializer=Orthogonal(),
216
217
218
        )

        # Decoder layers
219
220
221
222
223
        Model_B1 = BatchNormalization()
        Model_B2 = BatchNormalization()
        Model_B3 = BatchNormalization()
        Model_B4 = BatchNormalization()
        Model_B5 = BatchNormalization()
224
        Model_D0 = DenseTranspose(
225
            Model_E5, activation="relu", output_dim=self.ENCODING,
226
        )
227
228
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
lucas_miranda's avatar
lucas_miranda committed
229
        Model_D3 = RepeatVector(self.input_shape[1])
230
        Model_D4 = Bidirectional(
231
            LSTM(
232
                self.LSTM_units_1,
233
234
                activation="tanh",
                return_sequences=True,
235
                kernel_constraint=UnitNorm(axis=1),
236
237
            )
        )
238
        Model_D5 = Bidirectional(
239
            LSTM(
240
                self.LSTM_units_1,
241
242
                activation="sigmoid",
                return_sequences=True,
243
                kernel_constraint=UnitNorm(axis=1),
244
245
246
            )
        )

247
        # Define and instantiate encoder
248
        x = Input(shape=self.input_shape[1:])
249
        encoder = Model_E0(x)
250
        encoder = BatchNormalization()(encoder)
251
        encoder = Model_E1(encoder)
252
        encoder = BatchNormalization()(encoder)
253
        encoder = Model_E2(encoder)
254
        encoder = BatchNormalization()(encoder)
255
        encoder = Model_E3(encoder)
256
        encoder = BatchNormalization()(encoder)
257
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
258
        encoder = Model_E4(encoder)
259
        encoder = BatchNormalization()(encoder)
260
261
        encoder = Model_E5(encoder)

262
263
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
264
265
266
267
268
269
270
271
272

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

273
        # Define and instantiate generator
lucas_miranda's avatar
lucas_miranda committed
274
        generator = Model_D0(z)
275
        generator = Model_B1(generator)
lucas_miranda's avatar
lucas_miranda committed
276
        generator = Model_D1(generator)
277
        generator = Model_B2(generator)
lucas_miranda's avatar
lucas_miranda committed
278
        generator = Model_D2(generator)
279
        generator = Model_B3(generator)
lucas_miranda's avatar
lucas_miranda committed
280
281
        generator = Model_D3(generator)
        generator = Model_D4(generator)
282
        generator = Model_B4(generator)
lucas_miranda's avatar
lucas_miranda committed
283
        generator = Model_D5(generator)
284
        generator = Model_B5(generator)
lucas_miranda's avatar
lucas_miranda committed
285
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)
286

287
        # end-to-end autoencoder
lucas_miranda's avatar
lucas_miranda committed
288
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
289
        vae = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")
lucas_miranda's avatar
lucas_miranda committed
290

291
292
293
        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
294
        _generator = Model_B1(_generator)
295
        _generator = Model_D1(_generator)
296
        _generator = Model_B2(_generator)
297
        _generator = Model_D2(_generator)
298
        _generator = Model_B3(_generator)
299
300
        _generator = Model_D3(_generator)
        _generator = Model_D4(_generator)
301
        _generator = Model_B4(_generator)
302
        _generator = Model_D5(_generator)
303
        _generator = Model_B5(_generator)
304
305
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")
306

307
308
309
        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)
310
311

        vae.compile(
312
            loss="binary_crossentropy",#huber_loss,
lucas_miranda's avatar
lucas_miranda committed
313
            optimizer=Adam(lr=self.learn_rate,),
314
315
316
317
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

318
        return encoder, generator, vae
319
320


321
322
class SEQ_2_SEQ_VAEP:
    def __init__(
323
324
325
326
327
328
329
330
331
332
        self,
        input_shape,
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
        loss="ELBO+MMD",
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
        self.loss = loss

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            kernel_initializer=he_uniform(),
        )
        Model_E1 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E4 = Dense(
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
            kernel_initializer=Orthogonal(),
        )

        # Decoder layers
392
393
394
395
396
        Model_B1 = BatchNormalization()
        Model_B2 = BatchNormalization()
        Model_B3 = BatchNormalization()
        Model_B4 = BatchNormalization()
        Model_B5 = BatchNormalization()
397
398
399
        Model_D0 = DenseTranspose(
            Model_E5, activation="relu", output_dim=self.ENCODING,
        )
400
401
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
        Model_D3 = RepeatVector(self.input_shape[1])
        Model_D4 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

420
        # Define and instantiate encoder
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E1(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E2(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E3(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
        encoder = Model_E4(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E5(encoder)

        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

446
        # Define and instantiate generator
447
        generator = Model_D0(z)
448
        generator = Model_B1(generator)
449
        generator = Model_D1(generator)
450
        generator = Model_B2(generator)
451
        generator = Model_D2(generator)
452
        generator = Model_B3(generator)
453
454
        generator = Model_D3(generator)
        generator = Model_D4(generator)
455
        generator = Model_B4(generator)
456
        generator = Model_D5(generator)
457
        generator = Model_B5(generator)
458
459
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)

460
        # Define and instantiate predictor
461
462
463
        predictor = Dense(
            self.ENCODING, activation="relu", kernel_initializer=he_uniform()
        )(z)
464
        predictor = BatchNormalization()(predictor)
465
466
467
        predictor = Dense(
            self.DENSE_2, activation="relu", kernel_initializer=he_uniform()
        )(predictor)
468
        predictor = BatchNormalization()(predictor)
469
470
471
        predictor = Dense(
            self.DENSE_1, activation="relu", kernel_initializer=he_uniform()
        )(predictor)
472
        predictor = BatchNormalization()(predictor)
473
        predictor = RepeatVector(self.input_shape[1])(predictor)
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
        predictor = BatchNormalization()(predictor)
        predictor = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )(predictor)
491
        predictor = BatchNormalization()(predictor)
492
493
        x_predicted_mean = TimeDistributed(Dense(self.input_shape[2]))(predictor)

494
495
        # end-to-end autoencoder
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
496
497
498
        vaep = Model(
            inputs=x, outputs=[x_decoded_mean, x_predicted_mean], name="SEQ_2_SEQ_VAE"
        )
499
500
501
502

        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
503
        _generator = Model_B1(_generator)
504
        _generator = Model_D1(_generator)
505
        _generator = Model_B2(_generator)
506
        _generator = Model_D2(_generator)
507
        _generator = Model_B3(_generator)
508
509
        _generator = Model_D3(_generator)
        _generator = Model_D4(_generator)
510
        _generator = Model_B4(_generator)
511
        _generator = Model_D5(_generator)
512
        _generator = Model_B5(_generator)
513
514
515
516
517
518
519
520
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")

        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)

        vaep.compile(
521
            loss="binary_crossentropy",
522
            optimizer=Adam(lr=self.learn_rate,),
523
524
525
526
527
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

        return encoder, generator, vaep
528
529


530
class SEQ_2_SEQ_MMVAE:
531
    pass
lucas_miranda's avatar
lucas_miranda committed
532

533

534
# TODO next:
535
#      - MERGE BatchNormalization layers in generator and _generator in SEQ_2_SEQ_VAE
lucas_miranda's avatar
lucas_miranda committed
536
#      - VAE loss function (though this should be analysed later on taking the encodings into account)
537
#      - Smaller input sliding window (10-15 frames)