models.py 16.2 KB
Newer Older
1
2
3
# @author lucasmiranda42

from tensorflow.keras import Input, Model, Sequential
4
from tensorflow.keras.constraints import UnitNorm
5
from tensorflow.keras.initializers import he_uniform, Orthogonal
6
7
from tensorflow.keras.layers import BatchNormalization, Bidirectional, Dense
from tensorflow.keras.layers import Dropout, Lambda, LSTM
8
9
10
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
11
from source.model_utils import *
12
13
14
15
import tensorflow as tf


class SEQ_2_SEQ_AE:
16
17
18
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
19
20
21
22
23
24
25
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
45
            kernel_initializer=he_uniform(),
46
        )
47
        Model_E1 = Bidirectional(
48
            LSTM(
49
50
51
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
52
                kernel_constraint=UnitNorm(axis=0),
53
54
            )
        )
55
        Model_E2 = Bidirectional(
56
            LSTM(
57
58
59
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
60
                kernel_constraint=UnitNorm(axis=0),
61
62
            )
        )
63
        Model_E3 = Dense(
64
65
66
67
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
68
69
        )
        Model_E4 = Dense(
70
71
72
73
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
74
        )
75
76
77
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
78
            kernel_constraint=UnitNorm(axis=1),
79
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
80
            kernel_initializer=Orthogonal(),
81
82
83
        )

        # Decoder layers
84
        Model_D0 = DenseTranspose(
85
            Model_E5, activation="relu", output_dim=self.ENCODING,
86
        )
87
88
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
89
        Model_D3 = RepeatVector(self.input_shape[1])
90
        Model_D4 = Bidirectional(
91
            LSTM(
92
93
94
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
95
                kernel_constraint=UnitNorm(axis=1),
96
97
            )
        )
98
        Model_D5 = Bidirectional(
99
            LSTM(
100
101
102
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
103
                kernel_constraint=UnitNorm(axis=1),
104
105
106
107
            )
        )

        # Define and instanciate encoder
lucas_miranda's avatar
lucas_miranda committed
108
        encoder = Sequential(name="SEQ_2_SEQ_Encoder")
109
        encoder.add(Input(shape=self.input_shape[1:]))
110
        encoder.add(Model_E0)
111
        encoder.add(BatchNormalization())
112
        encoder.add(Model_E1)
113
        encoder.add(BatchNormalization())
114
        encoder.add(Model_E2)
115
        encoder.add(BatchNormalization())
116
        encoder.add(Model_E3)
117
        encoder.add(BatchNormalization())
118
119
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
120
        encoder.add(BatchNormalization())
121
122
123
        encoder.add(Model_E5)

        # Define and instanciate decoder
lucas_miranda's avatar
lucas_miranda committed
124
        decoder = Sequential(name="SEQ_2_SEQ_Decoder")
125
        decoder.add(Model_D0)
126
        encoder.add(BatchNormalization())
127
        decoder.add(Model_D1)
128
        encoder.add(BatchNormalization())
129
        decoder.add(Model_D2)
130
        encoder.add(BatchNormalization())
131
        decoder.add(Model_D3)
132
        decoder.add(BatchNormalization())
133
        decoder.add(Model_D4)
134
        encoder.add(BatchNormalization())
135
136
137
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

lucas_miranda's avatar
lucas_miranda committed
138
        model = Sequential([encoder, decoder], name="SEQ_2_SEQ_AE")
139
140
141

        model.compile(
            loss=Huber(reduction="sum", delta=100.0),
142
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
143
144
145
            metrics=["mae"],
        )

lucas_miranda's avatar
lucas_miranda committed
146
        return encoder, decoder, model
147
148
149


class SEQ_2_SEQ_VAE:
150
151
152
    def __init__(
        self,
        input_shape,
lucas_miranda's avatar
lucas_miranda committed
153
154
155
156
157
158
159
        CONV_filters=256,
        LSTM_units_1=256,
        LSTM_units_2=64,
        DENSE_2=64,
        DROPOUT_RATE=0.25,
        ENCODING=32,
        learn_rate=1e-3,
160
        loss="ELBO+MMD",
161
162
163
164
165
166
167
168
169
170
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
171
        self.loss = loss
172
173
174
175

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
176
            filters=self.CONV_filters,
177
178
179
180
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
181
            kernel_initializer=he_uniform(),
182
        )
183
        Model_E1 = Bidirectional(
184
            LSTM(
185
                self.LSTM_units_1,
186
187
                activation="tanh",
                return_sequences=True,
188
                kernel_constraint=UnitNorm(axis=0),
189
190
            )
        )
191
        Model_E2 = Bidirectional(
192
            LSTM(
193
                self.LSTM_units_2,
194
195
                activation="tanh",
                return_sequences=False,
196
                kernel_constraint=UnitNorm(axis=0),
197
198
            )
        )
199
        Model_E3 = Dense(
200
201
202
203
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
204
205
        )
        Model_E4 = Dense(
206
207
208
209
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
210
        )
211
        Model_E5 = Dense(
212
            self.ENCODING,
213
            activation="relu",
214
            kernel_constraint=UnitNorm(axis=1),
215
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
216
            kernel_initializer=Orthogonal(),
217
218
219
        )

        # Decoder layers
lucas_miranda's avatar
lucas_miranda committed
220

221
        Model_D0 = DenseTranspose(
222
            Model_E5, activation="relu", output_dim=self.ENCODING,
223
        )
224
225
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2,)
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1,)
lucas_miranda's avatar
lucas_miranda committed
226
        Model_D3 = RepeatVector(self.input_shape[1])
227
        Model_D4 = Bidirectional(
228
            LSTM(
229
                self.LSTM_units_1,
230
231
                activation="tanh",
                return_sequences=True,
232
                kernel_constraint=UnitNorm(axis=1),
233
234
            )
        )
235
        Model_D5 = Bidirectional(
236
            LSTM(
237
                self.LSTM_units_1,
238
239
                activation="sigmoid",
                return_sequences=True,
240
                kernel_constraint=UnitNorm(axis=1),
241
242
243
244
245
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
246
        encoder = Model_E0(x)
247
        encoder = BatchNormalization()(encoder)
248
        encoder = Model_E1(encoder)
249
        encoder = BatchNormalization()(encoder)
250
        encoder = Model_E2(encoder)
251
        encoder = BatchNormalization()(encoder)
252
        encoder = Model_E3(encoder)
253
        encoder = BatchNormalization()(encoder)
254
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
255
        encoder = Model_E4(encoder)
256
        encoder = BatchNormalization()(encoder)
257
258
        encoder = Model_E5(encoder)

259
260
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
261
262
263
264
265
266
267
268
269

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

lucas_miranda's avatar
lucas_miranda committed
270
271
        # Define and instanciate generator
        generator = Model_D0(z)
272
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
273
        generator = Model_D1(generator)
274
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
275
        generator = Model_D2(generator)
276
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
277
        generator = Model_D3(generator)
278
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
279
        generator = Model_D4(generator)
280
        generator = BatchNormalization()(generator)
lucas_miranda's avatar
lucas_miranda committed
281
282
        generator = Model_D5(generator)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)
283

284
        # end-to-end autoencoder
lucas_miranda's avatar
lucas_miranda committed
285
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
286
        vae = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")
lucas_miranda's avatar
lucas_miranda committed
287

288
289
290
        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
291
        _generator = BatchNormalization()(_generator)
292
        _generator = Model_D1(_generator)
293
        _generator = BatchNormalization()(_generator)
294
        _generator = Model_D2(_generator)
295
        _generator = BatchNormalization()(_generator)
296
        _generator = Model_D3(_generator)
297
        _generator = BatchNormalization()(_generator)
298
        _generator = Model_D4(_generator)
299
        _generator = BatchNormalization()(_generator)
300
301
302
        _generator = Model_D5(_generator)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")
303

304
305
306
        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)
307
308
309

        vae.compile(
            loss=huber_loss,
lucas_miranda's avatar
lucas_miranda committed
310
            optimizer=Adam(lr=self.learn_rate,),
311
312
313
314
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

315
        return encoder, generator, vae
316
317


318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class SEQ_2_SEQ_VAEP:
    def __init__(
            self,
            input_shape,
            CONV_filters=256,
            LSTM_units_1=256,
            LSTM_units_2=64,
            DENSE_2=64,
            DROPOUT_RATE=0.25,
            ENCODING=32,
            learn_rate=1e-3,
            loss="ELBO+MMD",
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
        self.loss = loss

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            kernel_initializer=he_uniform(),
        )
        Model_E1 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E3 = Dense(
            self.DENSE_1,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E4 = Dense(
            self.DENSE_2,
            activation="relu",
            kernel_constraint=UnitNorm(axis=0),
            kernel_initializer=he_uniform(),
        )
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
            kernel_initializer=Orthogonal(),
        )

        # Decoder layers

        Model_D0 = DenseTranspose(
            Model_E5, activation="relu", output_dim=self.ENCODING,
        )
        Model_D1 = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2, )
        Model_D2 = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1, )
        Model_D3 = RepeatVector(self.input_shape[1])
        Model_D4 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E1(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E2(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E3(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
        encoder = Model_E4(encoder)
        encoder = BatchNormalization()(encoder)
        encoder = Model_E5(encoder)

        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

        # Define and instanciate generator
        generator = Model_D0(z)
        generator = BatchNormalization()(generator)
        generator = Model_D1(generator)
        generator = BatchNormalization()(generator)
        generator = Model_D2(generator)
        generator = BatchNormalization()(generator)
        generator = Model_D3(generator)
        generator = BatchNormalization()(generator)
        generator = Model_D4(generator)
        generator = BatchNormalization()(generator)
        generator = Model_D5(generator)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(generator)

        # end-to-end autoencoder
        encoder = Model(x, z_mean, name="SEQ_2_SEQ_VEncoder")
        vaep = Model(x, x_decoded_mean, name="SEQ_2_SEQ_VAE")

        # Build generator as a separate entity
        g = Input(shape=self.ENCODING)
        _generator = Model_D0(g)
        _generator = BatchNormalization()(_generator)
        _generator = Model_D1(_generator)
        _generator = BatchNormalization()(_generator)
        _generator = Model_D2(_generator)
        _generator = BatchNormalization()(_generator)
        _generator = Model_D3(_generator)
        _generator = BatchNormalization()(_generator)
        _generator = Model_D4(_generator)
        _generator = BatchNormalization()(_generator)
        _generator = Model_D5(_generator)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(_generator)
        generator = Model(g, _x_decoded_mean, name="SEQ_2_SEQ_VGenerator")

        def huber_loss(x_, x_decoded_mean_):
            huber = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber(x_, x_decoded_mean_)

        vaep.compile(
            loss=huber_loss,
            optimizer=Adam(lr=self.learn_rate, ),
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

        return encoder, generator, vaep
485
486


487
class SEQ_2_SEQ_MMVAE:
488
    pass
lucas_miranda's avatar
lucas_miranda committed
489

490

491
# TODO next:
lucas_miranda's avatar
lucas_miranda committed
492
#      - VAE loss function (though this should be analysed later on taking the encodings into account)
493
#      - Smaller input sliding window (10-15 frames)