aboutsummaryrefslogtreecommitdiff
path: root/code/sunlab/sunflow/models/encoder.py
blob: 22d1a9a0094e8be44ecafefac602d8a717e6f420 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Encoder:
    """# Encoder Model

    Constructs an encoder model with a certain depth of intermediate layers of
    fixed size"""

    def __init__(self, model_base_directory):
        """# Encoder Model Initialization

        - model_base_directory: The base folder directory where the model will
        be saved/ loaded"""
        self.model_base_directory = model_base_directory

    def init(self):
        """# Initialize a new Encoder

        Expects a model parameters file to already exist in the initialization
        base directory when initializing the model"""
        from tensorflow import keras
        from tensorflow.keras import layers

        # Load in the model parameters
        self.load_parameters()
        assert self.depth >= 0, "Depth must be non-negative"

        # Create the model
        self.model = keras.models.Sequential()
        # At zero depth, connect input and output layer directly
        if self.depth == 0:
            self.model.add(
                layers.Dense(
                    self.latent_size,
                    input_shape=(self.data_size,),
                    activation=None,
                    name="encoder_latent_vector",
                )
            )
        # Otherwise, add fixed-sized layers between them
        else:
            self.model.add(
                layers.Dense(
                    self.layer_size,
                    input_shape=(self.data_size,),
                    activation=None,
                    name="encoder_dense_1",
                )
            )
            # Use LeakyReLU if specified
            if self.use_leaky_relu:
                self.model.add(layers.LeakyReLU())
            else:
                self.model.add(layers.ReLU())
            # Include a droput layer if specified
            if self.dropout > 0.0:
                self.model.add(layers.Dropout(self.dropout))
            for _d in range(1, self.depth):
                self.model.add(
                    layers.Dense(
                        self.layer_size, activation=None, name=f"encoder_dense_{_d+1}"
                    )
                )
                # Use LeakyReLU if specified
                if self.use_leaky_relu:
                    self.model.add(layers.LeakyReLU())
                else:
                    self.model.add(layers.ReLU())
                # Include a droput layer if specified
                if self.dropout > 0.0:
                    self.model.add(layers.Dropout(self.dropout))
            self.model.add(
                layers.Dense(
                    self.latent_size, activation=None, name="encoder_latent_vector"
                )
            )
        self.model._name = "Encoder"
        return self

    def load(self):
        """# Load an existing Encoder"""
        from os import listdir

        # If the encoder is not found, return None
        if "encoder.keras" not in listdir(f"{self.model_base_directory}/portable/"):
            return None
        # Otherwise, load the encoder
        #  compile=False suppresses warnings about training
        #  If you want to train it, you will need to recompile it
        import tensorflow as tf

        self.model = tf.keras.models.load_model(
            f"{self.model_base_directory}/portable/encoder.keras", compile=False
        )
        self.model._name = "Encoder"
        return self

    def save(self, overwrite=False):
        """# Save the current Encoder

        - Overwrite: overwrite any existing encoder that has been saved"""
        from os import listdir

        if overwrite:
            self.model.save(f"{self.model_base_directory}/portable/encoder.keras")
            return True
        if "encoder.keras" in listdir(f"{self.model_base_directory}/portable/"):
            return False
        self.model.save(f"{self.model_base_directory}/portable/encoder.keras")
        return True

    def load_parameters(self):
        """# Load Encoder Model Parameters from File
        The file needs to have the following parameters defined:
         - data_size: int
         - autoencoder_layer_size: int
         - latent_size: int
         - autoencoder_depth: int
         - dropout: float (set to 0. if you don't want a dropout layer)
         - use_leaky_relu: boolean"""
        from pickle import load

        with open(
            f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
        ) as phandle:
            parameters = load(phandle)
        self.data_size = parameters["data_size"]
        self.layer_size = parameters["autoencoder_layer_size"]
        self.latent_size = parameters["latent_size"]
        self.depth = parameters["autoencoder_depth"]
        self.dropout = parameters["dropout"]
        self.use_leaky_relu = parameters["use_leaky_relu"]

    def summary(self):
        """# Returns the summary of the Encoder model"""
        return self.model.summary()

    def __call__(self, *args, **kwargs):
        """# Callable

        When calling the encoder class, return the model's output"""
        return self.model(*args, **kwargs)