models.py 8.56 KB
Newer Older
1
2
3
# @author lucasmiranda42

from tensorflow.keras import Input, Model, Sequential
4
from tensorflow.keras.constraints import UnitNorm
5
from tensorflow.keras.layers import Bidirectional, Dense, Dropout
6
from tensorflow.keras.layers import Lambda, LSTM
7
8
9
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
10
from source.model_utils import *
11
12
13
14
import tensorflow as tf


class SEQ_2_SEQ_AE:
15
16
17
18
19
20
21
22
23
    def __init__(
        self,
        input_shape,
        CONV_filters,
        LSTM_units_1,
        LSTM_units_2,
        DENSE_2,
        DROPOUT_RATE,
        ENCODING,
24
        learn_rate,
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
            filters=self.CONV_filters,
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
            input_shape=self.input_shape[1:],
        )
        Model_E1 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
                self.LSTM_units_2,
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
62
63
64
65
66
67
        Model_E3 = Dense(
            self.DENSE_1, activation="relu", kernel_constraint=UnitNorm(axis=0)
        )
        Model_E4 = Dense(
            self.DENSE_2, activation="relu", kernel_constraint=UnitNorm(axis=0)
        )
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
        Model_E5 = Dense(
            self.ENCODING,
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
        )

        # Decoder layers
        Model_D4 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
                self.LSTM_units_1,
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        encoder = Sequential(name="DLC_encoder")
        encoder.add(Model_E0)
        encoder.add(Model_E1)
        encoder.add(Model_E2)
        encoder.add(Model_E3)
        encoder.add(Dropout(self.DROPOUT_RATE))
        encoder.add(Model_E4)
        encoder.add(Model_E5)

        # Define and instanciate decoder
        decoder = Sequential(name="DLC_Decoder")
        decoder.add(
            DenseTranspose(
                Model_E5, activation="relu", input_shape=(self.ENCODING,), output_dim=64
            )
        )
        decoder.add(DenseTranspose(Model_E4, activation="relu", output_dim=128))
        decoder.add(DenseTranspose(Model_E3, activation="relu", output_dim=256))
        decoder.add(RepeatVector(self.input_shape[1]))
        decoder.add(Model_D4)
        decoder.add(Model_D5)
        decoder.add(TimeDistributed(Dense(self.input_shape[2])))

        model = Sequential([encoder, decoder], name="DLC_Autoencoder")

        model.compile(
            loss=Huber(reduction="sum", delta=100.0),
121
            optimizer=Adam(lr=self.learn_rate, clipvalue=0.5,),
122
123
124
125
            metrics=["mae"],
        )

        return model
126
127
128


class SEQ_2_SEQ_VAE:
129
130
131
132
133
134
135
136
137
    def __init__(
        self,
        input_shape,
        CONV_filters,
        LSTM_units_1,
        LSTM_units_2,
        DENSE_2,
        DROPOUT_RATE,
        ENCODING,
138
139
        learn_rate=1e-5,
        loss="ELBO+MMD",
140
141
142
143
144
145
146
147
148
149
    ):
        self.input_shape = input_shape
        self.CONV_filters = CONV_filters
        self.LSTM_units_1 = LSTM_units_1
        self.LSTM_units_2 = LSTM_units_2
        self.DENSE_1 = LSTM_units_2
        self.DENSE_2 = DENSE_2
        self.DROPOUT_RATE = DROPOUT_RATE
        self.ENCODING = ENCODING
        self.learn_rate = learn_rate
150
        self.loss = loss
151
152
153
154

    def build(self):
        # Encoder Layers
        Model_E0 = tf.keras.layers.Conv1D(
155
            filters=self.CONV_filters,
156
157
158
159
160
161
162
            kernel_size=5,
            strides=1,
            padding="causal",
            activation="relu",
        )
        Model_E1 = Bidirectional(
            LSTM(
163
                self.LSTM_units_1,
164
165
166
167
168
169
170
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
        Model_E2 = Bidirectional(
            LSTM(
171
                self.LSTM_units_2,
172
173
174
175
176
                activation="tanh",
                return_sequences=False,
                kernel_constraint=UnitNorm(axis=0),
            )
        )
177
178
179
180
181
182
        Model_E3 = Dense(
            self.DENSE_1, activation="relu", kernel_constraint=UnitNorm(axis=0)
        )
        Model_E4 = Dense(
            self.DENSE_2, activation="relu", kernel_constraint=UnitNorm(axis=0)
        )
183
        Model_E5 = Dense(
184
            self.ENCODING,
185
186
187
188
189
190
191
192
            activation="relu",
            kernel_constraint=UnitNorm(axis=1),
            activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
        )

        # Decoder layers
        Model_D4 = Bidirectional(
            LSTM(
193
                self.LSTM_units_1,
194
195
196
197
198
199
200
                activation="tanh",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )
        Model_D5 = Bidirectional(
            LSTM(
201
                self.LSTM_units_1,
202
203
204
205
206
207
208
209
210
211
212
213
                activation="sigmoid",
                return_sequences=True,
                kernel_constraint=UnitNorm(axis=1),
            )
        )

        # Define and instanciate encoder
        x = Input(shape=self.input_shape[1:])
        encoder = Model_E0(x)
        encoder = Model_E1(encoder)
        encoder = Model_E2(encoder)
        encoder = Model_E3(encoder)
214
        encoder = Dropout(self.DROPOUT_RATE)(encoder)
215
216
217
        encoder = Model_E4(encoder)
        encoder = Model_E5(encoder)

218
219
        z_mean = Dense(self.ENCODING)(encoder)
        z_log_sigma = Dense(self.ENCODING)(encoder)
220
221
222
223
224
225
226
227
228
229

        if "ELBO" in self.loss:
            z_mean, z_log_sigma = KLDivergenceLayer()([z_mean, z_log_sigma])

        z = Lambda(sampling)([z_mean, z_log_sigma])

        if "MMD" in self.loss:
            z = MMDiscrepancyLayer()(z)

        # Define and instanciate decoder
230
231
        decoder = DenseTranspose(Model_E5, activation="relu", output_dim=self.ENCODING)(z)
        decoder = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2)(
232
233
            decoder
        )
234
        decoder = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1)(
235
236
237
238
239
240
241
            decoder
        )
        decoder = RepeatVector(self.input_shape[1])(decoder)
        decoder = Model_D4(decoder)
        decoder = Model_D5(decoder)
        x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(decoder)

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        #generator
        _decoder_input = Input(shape=(self.ENCODING,))
        _decoder = DenseTranspose(Model_E5, activation="relu", output_dim=self.ENCODING)(z)
        _decoder = DenseTranspose(Model_E4, activation="relu", output_dim=self.DENSE_2)(
            decoder
        )
        _decoder = DenseTranspose(Model_E3, activation="relu", output_dim=self.DENSE_1)(
            decoder
        )
        _decoder = RepeatVector(self.input_shape[1])(decoder)
        _decoder = Model_D4(decoder)
        _decoder = Model_D5(decoder)
        _x_decoded_mean = TimeDistributed(Dense(self.input_shape[2]))(decoder)
        generator = Model(_decoder_input, _x_decoded_mean)

257
258
259
260
261
262
263
264
265
266
        # end-to-end autoencoder
        vae = Model(x, x_decoded_mean)

        def huber_loss(x, x_decoded_mean):
            huber_loss = Huber(reduction="sum", delta=100.0)
            return self.input_shape[1:] * huber_loss(x, x_decoded_mean)

        vae.compile(
            loss=huber_loss,
            optimizer=Adam(
267
                lr=self.learn_rate,
268
269
270
271
272
273
            ),
            metrics=["mae"],
            experimental_run_tf_function=False,
        )

        return encoder, generator, vae
274
275


276
class SEQ_2_SEQ_MVAE:
277
278
279
    pass


280
class SEQ_2_SEQ_MMVAE:
281
    pass