diff options
Diffstat (limited to 'code/sunlab/sunflow')
-rw-r--r-- | code/sunlab/sunflow/__init__.py | 6 | ||||
-rw-r--r-- | code/sunlab/sunflow/data/__init__.py | 1 | ||||
-rw-r--r-- | code/sunlab/sunflow/data/utilities.py | 53 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/__init__.py | 7 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/adversarial_autoencoder.py | 344 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/autoencoder.py | 85 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/decoder.py | 127 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/discriminator.py | 132 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/encoder.py | 140 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/encoder_discriminator.py | 96 | ||||
-rw-r--r-- | code/sunlab/sunflow/models/utilities.py | 93 | ||||
-rw-r--r-- | code/sunlab/sunflow/plotting/__init__.py | 1 | ||||
-rw-r--r-- | code/sunlab/sunflow/plotting/model_extensions.py | 289 |
13 files changed, 1374 insertions, 0 deletions
diff --git a/code/sunlab/sunflow/__init__.py b/code/sunlab/sunflow/__init__.py new file mode 100644 index 0000000..6e0c959 --- /dev/null +++ b/code/sunlab/sunflow/__init__.py @@ -0,0 +1,6 @@ +from ..common import * + +from .models import * + +from .data import * +from .plotting import * diff --git a/code/sunlab/sunflow/data/__init__.py b/code/sunlab/sunflow/data/__init__.py new file mode 100644 index 0000000..b9a32c0 --- /dev/null +++ b/code/sunlab/sunflow/data/__init__.py @@ -0,0 +1 @@ +from .utilities import * diff --git a/code/sunlab/sunflow/data/utilities.py b/code/sunlab/sunflow/data/utilities.py new file mode 100644 index 0000000..dcdc36e --- /dev/null +++ b/code/sunlab/sunflow/data/utilities.py @@ -0,0 +1,53 @@ +from sunlab.common import ShapeDataset +from sunlab.common import MaxAbsScaler + + +def process_and_load_dataset( + dataset_file, model_folder, magnification=10, scaler=MaxAbsScaler +): + """# Load a dataset and process a models' Latent Space on the Dataset""" + from ..models import load_aae + from sunlab.common import import_full_dataset + + model = load_aae(model_folder, normalization_scaler=scaler) + dataset = import_full_dataset( + dataset_file, magnification=magnification, scaler=model.scaler + ) + latent = model.encoder(dataset.dataset).numpy() + assert len(latent.shape) == 2, "Only 1D Latent Vectors Supported" + for dim in range(latent.shape[1]): + dataset.dataframe[f"Latent-{dim}"] = latent[:, dim] + return dataset + + +def process_and_load_datasets( + dataset_file_list, model_folder, magnification=10, scaler=MaxAbsScaler +): + from pandas import concat + from ..models import load_aae + + dataframes = [] + datasets = [] + for dataset_file in dataset_file_list: + dataset = process_and_load_dataset( + dataset_file, model_folder, magnification, scaler + ) + model = load_aae(model_folder, normalization_scaler=scaler) + dataframe = dataset.dataframe + for label in ["ActinEdge", "Filopodia", "Bleb", "Lamellipodia"]: + if label in dataframe.columns: + dataframe[label.lower()] = dataframe[label] + if label.lower() not in dataframe.columns: + dataframe[label.lower()] = 0 + latent_columns = [f"Latent-{dim}" for dim in range(model.latent_size)] + datasets.append(dataset) + dataframes.append( + dataframe[ + dataset.data_columns + + dataset.label_columns + + latent_columns + + ["Frames", "CellNum"] + + ["actinedge", "filopodia", "bleb", "lamellipodia"] + ] + ) + return datasets, concat(dataframes) diff --git a/code/sunlab/sunflow/models/__init__.py b/code/sunlab/sunflow/models/__init__.py new file mode 100644 index 0000000..f111c0a --- /dev/null +++ b/code/sunlab/sunflow/models/__init__.py @@ -0,0 +1,7 @@ +from .autoencoder import Autoencoder +from .adversarial_autoencoder import AdversarialAutoencoder +from sunlab.common.data.dataset import Dataset +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from .utilities import create_aae, create_aae_and_dataset +from .utilities import load_aae, load_aae_and_dataset diff --git a/code/sunlab/sunflow/models/adversarial_autoencoder.py b/code/sunlab/sunflow/models/adversarial_autoencoder.py new file mode 100644 index 0000000..4cbb2f8 --- /dev/null +++ b/code/sunlab/sunflow/models/adversarial_autoencoder.py @@ -0,0 +1,344 @@ +from sunlab.common.data.dataset import Dataset +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from .encoder import Encoder +from .decoder import Decoder +from .discriminator import Discriminator +from .encoder_discriminator import EncoderDiscriminator +from .autoencoder import Autoencoder +from tensorflow.keras import optimizers, metrics, losses +import tensorflow as tf +from numpy import ones, zeros, float32, NaN + + +class AdversarialAutoencoder: + """# Adversarial Autoencoder + - distribution: The distribution used by the adversary to learn on""" + + def __init__( + self, + model_base_directory, + distribution: AdversarialDistribution or None = None, + scaler: AdversarialScaler or None = None, + ): + """# Adversarial Autoencoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded + - distribution: The distribution the adversary will use + - scaler: The scaling function the model will assume on the data""" + self.model_base_directory = model_base_directory + if distribution is not None: + self.distribution = distribution + else: + self.distribution = None + if scaler is not None: + self.scaler = scaler(self.model_base_directory) + else: + self.scaler = None + + def init( + self, + data=None, + data_size=13, + autoencoder_layer_size=16, + adversary_layer_size=8, + latent_size=2, + autoencoder_depth=2, + dropout=0.0, + use_leaky_relu=False, + **kwargs, + ): + """# Initialize AAE model parameters + - data_size: int + - autoencoder_layer_size: int + - adversary_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float + - use_leaky_relu: boolean""" + self.data_size = data_size + self.autoencoder_layer_size = autoencoder_layer_size + self.adversary_layer_size = adversary_layer_size + self.latent_size = latent_size + self.autoencoder_depth = autoencoder_depth + self.dropout = dropout + self.use_leaky_relu = use_leaky_relu + self.save_parameters() + self.encoder = Encoder(self.model_base_directory).init() + self.decoder = Decoder(self.model_base_directory).init() + self.autoencoder = Autoencoder(self.model_base_directory).init( + self.encoder, self.decoder + ) + self.discriminator = Discriminator(self.model_base_directory).init() + self.encoder_discriminator = EncoderDiscriminator( + self.model_base_directory + ).init(self.encoder, self.discriminator) + if self.distribution is not None: + self.distribution = self.distribution(self.latent_size) + if (data is not None) and (self.scaler is not None): + self.scaler = self.scaler.init(data) + self.init_optimizers_and_metrics(**kwargs) + return self + + def init_optimizers_and_metrics( + self, + optimizer=optimizers.Adam, + ae_metric=metrics.MeanAbsoluteError, + adv_metric=metrics.BinaryCrossentropy, + ae_lr=7e-4, + adv_lr=3e-4, + loss_fn=losses.BinaryCrossentropy, + **kwargs, + ): + """# Set the optimizer, loss function, and metrics""" + self.ae_optimizer = optimizer(learning_rate=ae_lr) + self.adv_optimizer = optimizer(learning_rate=adv_lr) + self.gan_optimizer = optimizer(learning_rate=adv_lr) + self.train_ae_metric = ae_metric() + self.val_ae_metric = ae_metric() + self.train_adv_metric = adv_metric() + self.val_adv_metric = adv_metric() + self.train_gan_metric = adv_metric() + self.val_gan_metric = adv_metric() + self.loss_fn = loss_fn() + + def load(self): + """# Load the models from their respective files""" + self.load_parameters() + self.encoder = Encoder(self.model_base_directory).load() + self.decoder = Decoder(self.model_base_directory).load() + self.autoencoder = Autoencoder(self.model_base_directory).load() + self.discriminator = Discriminator(self.model_base_directory).load() + self.encoder_discriminator = EncoderDiscriminator( + self.model_base_directory + ).load() + if self.scaler is not None: + self.scaler = self.scaler.load() + return self + + def save(self, overwrite=False): + """# Save each model in the AAE""" + self.encoder.save(overwrite=overwrite) + self.decoder.save(overwrite=overwrite) + self.autoencoder.save(overwrite=overwrite) + self.discriminator.save(overwrite=overwrite) + self.encoder_discriminator.save(overwrite=overwrite) + if self.scaler is not None: + self.scaler.save() + + def save_parameters(self): + """# Save the AAE parameters in a file""" + from pickle import dump + from os import makedirs + + makedirs(self.model_base_directory + "/portable/", exist_ok=True) + parameters = { + "data_size": self.data_size, + "autoencoder_layer_size": self.autoencoder_layer_size, + "adversary_layer_size": self.adversary_layer_size, + "latent_size": self.latent_size, + "autoencoder_depth": self.autoencoder_depth, + "dropout": self.dropout, + "use_leaky_relu": self.use_leaky_relu, + } + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "wb" + ) as phandle: + dump(parameters, phandle) + + def load_parameters(self): + """# Load the AAE parameters from a file""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.autoencoder_layer_size = parameters["autoencoder_layer_size"] + self.adversary_layer_size = parameters["adversary_layer_size"] + self.latent_size = parameters["latent_size"] + self.autoencoder_depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + return parameters + + def summary(self): + """# Summarize each model in the AAE""" + self.encoder.summary() + self.decoder.summary() + self.autoencoder.summary() + self.discriminator.summary() + self.encoder_discriminator.summary() + + @tf.function + def train_step(self, x, y): + """# Training Step + + 1. Train the Autoencoder + 2. (If distribution is given) Train the discriminator + 3. (If the distribution is given) Train the encoder_discriminator""" + # Autoencoder Training + with tf.GradientTape() as tape: + decoded_vector = self.autoencoder(x, training=True) + ae_loss_value = self.loss_fn(y, decoded_vector) + grads = tape.gradient(ae_loss_value, self.autoencoder.model.trainable_weights) + self.ae_optimizer.apply_gradients( + zip(grads, self.autoencoder.model.trainable_weights) + ) + self.train_ae_metric.update_state(y, decoded_vector) + if self.distribution is not None: + # Adversary Trainig + with tf.GradientTape() as tape: + latent_vector = self.encoder(x) + fakepred = self.distribution(x.shape[0]) + discbatch_x = tf.concat([latent_vector, fakepred], axis=0) + discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0) + adversary_vector = self.discriminator(discbatch_x, training=True) + adv_loss_value = self.loss_fn(discbatch_y, adversary_vector) + grads = tape.gradient( + adv_loss_value, self.discriminator.model.trainable_weights + ) + self.adv_optimizer.apply_gradients( + zip(grads, self.discriminator.model.trainable_weights) + ) + self.train_adv_metric.update_state(discbatch_y, adversary_vector) + # Gan Training + with tf.GradientTape() as tape: + gan_vector = self.encoder_discriminator(x, training=True) + adv_vector = tf.convert_to_tensor(ones((x.shape[0], 1), dtype=float32)) + gan_loss_value = self.loss_fn(gan_vector, adv_vector) + grads = tape.gradient(gan_loss_value, self.encoder.model.trainable_weights) + self.gan_optimizer.apply_gradients( + zip(grads, self.encoder.model.trainable_weights) + ) + self.train_gan_metric.update_state(adv_vector, gan_vector) + return (ae_loss_value, adv_loss_value, gan_loss_value) + return (ae_loss_value, None, None) + + @tf.function + def test_step(self, x, y): + """# Test Step - On validation data + + 1. Evaluate the Autoencoder + 2. (If distribution is given) Evaluate the discriminator + 3. (If the distribution is given) Evaluate the encoder_discriminator""" + val_decoded_vector = self.autoencoder(x, training=False) + self.val_ae_metric.update_state(y, val_decoded_vector) + + if self.distribution is not None: + latent_vector = self.encoder(x) + fakepred = self.distribution(x.shape[0]) + discbatch_x = tf.concat([latent_vector, fakepred], axis=0) + discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0) + adversary_vector = self.discriminator(discbatch_x, training=False) + self.val_adv_metric.update_state(discbatch_y, adversary_vector) + + gan_vector = self.encoder_discriminator(x, training=False) + self.val_gan_metric.update_state(ones(x.shape[0]), gan_vector) + + # Garbage Collect at the end of each epoch + def on_epoch_end(self, _epoch, logs=None): + """# Cleanup environment to prevent memory leaks each epoch""" + import gc + from tensorflow.keras import backend as k + + gc.collect() + k.clear_session() + + def train( + self, + dataset: Dataset, + epoch_count: int = 1, + output=False, + output_freq=1, + fmt="%i[%.3f]: %.2e %.2e %.2e %.2e %.2e %.2e", + ): + """# Train the model on a dataset + + - dataset: ataset = Dataset to train the model on, which as the + training and validation iterators set up + - epoch_count: int = The number of epochs to train + - output: boolean = Whether or not to output training information + - output_freq: int = The number of epochs between each output""" + from time import time + from numpy import array as narray + + def fmtter(x): + return x if x is not None else -1 + + epoch_data = [] + dataset.reset_iterators() + + self.test_step(dataset.dataset, dataset.dataset) + val_ae = self.val_ae_metric.result() + val_adv = self.val_adv_metric.result() + val_gan = self.val_gan_metric.result() + self.val_ae_metric.reset_states() + self.val_adv_metric.reset_states() + self.val_gan_metric.reset_states() + print( + fmt + % ( + 0, + NaN, + val_ae, + fmtter(val_adv), + fmtter(val_gan), + NaN, + NaN, + NaN, + ) + ) + for epoch in range(epoch_count): + start_time = time() + + for step, (x_batch_train, y_batch_train) in enumerate(dataset.training): + ae_lv, adv_lv, gan_lv = self.train_step(x_batch_train, x_batch_train) + + train_ae = self.train_ae_metric.result() + train_adv = self.train_adv_metric.result() + train_gan = self.train_gan_metric.result() + self.train_ae_metric.reset_states() + self.train_adv_metric.reset_states() + self.train_gan_metric.reset_states() + + for step, (x_batch_val, y_batch_val) in enumerate(dataset.validation): + self.test_step(x_batch_val, x_batch_val) + + val_ae = self.val_ae_metric.result() + val_adv = self.val_adv_metric.result() + val_gan = self.val_gan_metric.result() + self.val_ae_metric.reset_states() + self.val_adv_metric.reset_states() + self.val_gan_metric.reset_states() + + epoch_data.append( + ( + epoch, + train_ae, + val_ae, + fmtter(train_adv), + fmtter(val_adv), + fmtter(train_gan), + fmtter(val_gan), + ) + ) + if output and (epoch + 1) % output_freq == 0: + print( + fmt + % ( + epoch + 1, + time() - start_time, + train_ae, + fmtter(train_adv), + fmtter(train_gan), + val_ae, + fmtter(val_adv), + fmtter(val_gan), + ) + ) + self.on_epoch_end(epoch) + dataset.reset_iterators() + return narray(epoch_data) diff --git a/code/sunlab/sunflow/models/autoencoder.py b/code/sunlab/sunflow/models/autoencoder.py new file mode 100644 index 0000000..473d00d --- /dev/null +++ b/code/sunlab/sunflow/models/autoencoder.py @@ -0,0 +1,85 @@ +class Autoencoder: + """# Autoencoder Model + + Constructs an encoder-decoder model""" + + def __init__(self, model_base_directory): + """# Autoencoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self, encoder, decoder): + """# Initialize an Autoencoder + + - encoder: The encoder to use + - decoder: The decoder to use""" + from tensorflow import keras + + self.load_parameters() + self.model = keras.models.Sequential() + self.model.add(encoder.model) + self.model.add(decoder.model) + self.model._name = "Autoencoder" + return self + + def load(self): + """# Load an existing Autoencoder""" + from os import listdir + + if "autoencoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/autoencoder.keras", compile=False + ) + self.model._name = "Autoencoder" + return self + + def save(self, overwrite=False): + """# Save the current Autoencoder + + - Overwrite: overwrite any existing autoencoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras") + return True + if "autoencoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras") + return True + + def load_parameters(self): + """# Load Autoencoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Autoencoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the autoencoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/decoder.py b/code/sunlab/sunflow/models/decoder.py new file mode 100644 index 0000000..40ea190 --- /dev/null +++ b/code/sunlab/sunflow/models/decoder.py @@ -0,0 +1,127 @@ +class Decoder: + """# Decoder Model + + Constructs a decoder model with a certain depth of intermediate layers of + fixed size""" + + def __init__(self, model_base_directory): + """# Decoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Decoder + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + self.model = keras.models.Sequential() + if self.depth == 0: + self.model.add( + layers.Dense( + self.data_size, + input_shape=(self.latent_size,), + activation=None, + name="decoder_latent_vector", + ) + ) + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.latent_size,), + activation=None, + name="decoder_dense_1", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, activation=None, name=f"decoder_dense_{_d+1}" + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + self.data_size, activation=None, name="decoder_output_vector" + ) + ) + self.model._name = "Decoder" + return self + + def load(self): + """# Load an existing Decoder""" + from os import listdir + + if "decoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/decoder.keras", compile=False + ) + self.model._name = "Decoder" + return self + + def save(self, overwrite=False): + """# Save the current Decoder + + - Overwrite: overwrite any existing decoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/decoder.keras") + return True + if "decoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/decoder.keras") + return True + + def load_parameters(self): + """# Load Decoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Decoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the decoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/discriminator.py b/code/sunlab/sunflow/models/discriminator.py new file mode 100644 index 0000000..38bed56 --- /dev/null +++ b/code/sunlab/sunflow/models/discriminator.py @@ -0,0 +1,132 @@ +class Discriminator: + """# Discriminator Model + + Constructs a discriminator model with a certain depth of intermediate + layers of fixed size""" + + def __init__(self, model_base_directory): + """# Discriminator Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Discriminator + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + self.model = keras.models.Sequential() + if self.depth == 0: + self.model.add( + layers.Dense( + 1, + input_shape=(self.latent_size,), + activation=None, + name="discriminator_output_vector", + ) + ) + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.latent_size,), + activation=None, + name="discriminator_dense_1", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, + activation=None, + name=f"discriminator_dense_{_d+1}", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + 1, activation="sigmoid", name="discriminator_output_vector" + ) + ) + self.model._name = "Discriminator" + return self + + def load(self): + """# Load an existing Discriminator""" + from os import listdir + + if "discriminator.keras" not in listdir( + f"{self.model_base_directory}/portable/" + ): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/discriminator.keras", compile=False + ) + self.model._name = "Discriminator" + return self + + def save(self, overwrite=False): + """# Save the current Discriminator + + - Overwrite: overwrite any existing discriminator that has been + saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/discriminator.keras") + return True + if "discriminator.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/discriminator.keras") + return True + + def load_parameters(self): + """# Load Discriminator Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - adversary_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["adversary_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Discriminator model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the discriminator class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/encoder.py b/code/sunlab/sunflow/models/encoder.py new file mode 100644 index 0000000..22d1a9a --- /dev/null +++ b/code/sunlab/sunflow/models/encoder.py @@ -0,0 +1,140 @@ +class Encoder: + """# Encoder Model + + Constructs an encoder model with a certain depth of intermediate layers of + fixed size""" + + def __init__(self, model_base_directory): + """# Encoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Encoder + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + # Load in the model parameters + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + + # Create the model + self.model = keras.models.Sequential() + # At zero depth, connect input and output layer directly + if self.depth == 0: + self.model.add( + layers.Dense( + self.latent_size, + input_shape=(self.data_size,), + activation=None, + name="encoder_latent_vector", + ) + ) + # Otherwise, add fixed-sized layers between them + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.data_size,), + activation=None, + name="encoder_dense_1", + ) + ) + # Use LeakyReLU if specified + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + # Include a droput layer if specified + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, activation=None, name=f"encoder_dense_{_d+1}" + ) + ) + # Use LeakyReLU if specified + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + # Include a droput layer if specified + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + self.latent_size, activation=None, name="encoder_latent_vector" + ) + ) + self.model._name = "Encoder" + return self + + def load(self): + """# Load an existing Encoder""" + from os import listdir + + # If the encoder is not found, return None + if "encoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + # Otherwise, load the encoder + # compile=False suppresses warnings about training + # If you want to train it, you will need to recompile it + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/encoder.keras", compile=False + ) + self.model._name = "Encoder" + return self + + def save(self, overwrite=False): + """# Save the current Encoder + + - Overwrite: overwrite any existing encoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/encoder.keras") + return True + if "encoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/encoder.keras") + return True + + def load_parameters(self): + """# Load Encoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Encoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the encoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/encoder_discriminator.py b/code/sunlab/sunflow/models/encoder_discriminator.py new file mode 100644 index 0000000..5efb6af --- /dev/null +++ b/code/sunlab/sunflow/models/encoder_discriminator.py @@ -0,0 +1,96 @@ +class EncoderDiscriminator: + """# EncoderDiscriminator Model + + Constructs an encoder-discriminator model""" + + def __init__(self, model_base_directory): + """# EncoderDiscriminator Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self, encoder, discriminator): + """# Initialize a EncoderDiscriminator + + - encoder: The encoder to use + - discriminator: The discriminator to use""" + from tensorflow import keras + + self.load_parameters() + self.model = keras.models.Sequential() + self.model.add(encoder.model) + self.model.add(discriminator.model) + self.model._name = "EncoderDiscriminator" + return self + + def load(self): + """# Load an existing EncoderDiscriminator""" + from os import listdir + + if "encoder_discriminator.keras" not in listdir( + f"{self.model_base_directory}/portable/" + ): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras", + compile=False, + ) + self.model._name = "EncoderDiscriminator" + return self + + def save(self, overwrite=False): + """# Save the current EncoderDiscriminator + + - Overwrite: overwrite any existing encoder_discriminator that has been + saved""" + from os import listdir + + if overwrite: + self.model.save( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras" + ) + return True + if "encoder_discriminator.keras" in listdir( + f"{self.model_base_directory}/portable/" + ): + return False + self.model.save( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras" + ) + return True + + def load_parameters(self): + """# Load EncoderDiscriminator Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the EncoderDiscriminator model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the encoder_discriminator class, return the model's + output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/utilities.py b/code/sunlab/sunflow/models/utilities.py new file mode 100644 index 0000000..ab0c2a6 --- /dev/null +++ b/code/sunlab/sunflow/models/utilities.py @@ -0,0 +1,93 @@ +# Higher-level functions + +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from sunlab.common.data.utilities import import_dataset +from .adversarial_autoencoder import AdversarialAutoencoder + + +def create_aae( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + distribution: AdversarialDistribution or None, + magnification=10, + latent_size=2, +): + """# Create Adversarial Autoencoder + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - distribution: AdversarialDistribution = Distribution for the Adversary + - magnification: int = The Magnification of the Dataset""" + dataset = import_dataset(dataset_file_name, magnification) + model = AdversarialAutoencoder( + model_directory, distribution, normalization_scaler + ).init(dataset.dataset, latent_size=latent_size) + return model + + +def create_aae_and_dataset( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + distribution: AdversarialDistribution or None, + magnification=10, + batch_size=1024, + shuffle=True, + val_split=0.1, + latent_size=2, +): + """# Create Adversarial Autoencoder and Load the Dataset + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - distribution: AdversarialDistribution = Distribution for the Adversary + - magnification: int = The Magnification of the Dataset""" + model = create_aae( + dataset_file_name, + model_directory, + normalization_scaler, + distribution, + magnification=magnification, + latent_size=latent_size, + ) + dataset = import_dataset( + dataset_file_name, + magnification, + batch_size=batch_size, + shuffle=shuffle, + val_split=val_split, + scaler=model.scaler, + ) + return model, dataset + + +def load_aae(model_directory, normalization_scaler: AdversarialScaler): + """# Load Adversarial Autoencoder + + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + """ + return AdversarialAutoencoder(model_directory, None, normalization_scaler).load() + + +def load_aae_and_dataset( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + magnification=10, +): + """# Load Adversarial Autoencoder + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - magnification: int = The Magnification of the Dataset""" + model = load_aae(model_directory, normalization_scaler) + dataset = import_dataset( + dataset_file_name, magnification=magnification, scaler=model.scaler + ) + return model, dataset diff --git a/code/sunlab/sunflow/plotting/__init__.py b/code/sunlab/sunflow/plotting/__init__.py new file mode 100644 index 0000000..36e00e6 --- /dev/null +++ b/code/sunlab/sunflow/plotting/__init__.py @@ -0,0 +1 @@ +from .model_extensions import * diff --git a/code/sunlab/sunflow/plotting/model_extensions.py b/code/sunlab/sunflow/plotting/model_extensions.py new file mode 100644 index 0000000..087f8d3 --- /dev/null +++ b/code/sunlab/sunflow/plotting/model_extensions.py @@ -0,0 +1,289 @@ +from matplotlib import pyplot as plt +from sunlab.common.data.shape_dataset import ShapeDataset +from sunlab.globals import DIR_ROOT + + +def get_nonphysical_masks( + model, + xrange=[-1, 1], + yrange=[-1, 1], + bins=[500, 500], + equivdiameter_threshold=10, + solidity_threshold=0.1, + area_threshold=100, + perimeter_threshold=10, + area_max_threshold=7000, + perimeter_max_threshold=350, + area_min_threshold=100, + perimeter_min_threshold=5, + consistency_check=False, +): + """# Generate the Nonphysical Masks in Grid for Model + + Hard Constraints: + - Non-negative values + - Ratios no greater than 1 + + Soft Constraints: + - Area/ Perimeter Thresholds""" + import numpy as np + + x = np.linspace(xrange[0], xrange[1], bins[0]) + y = np.linspace(yrange[0], yrange[1], bins[1]) + X, Y = np.meshgrid(x, y) + X, Y = X.reshape((bins[0], bins[1], 1)), Y.reshape((bins[0], bins[1], 1)) + XY = np.concatenate([X.reshape((-1, 1)), Y.reshape((-1, 1))], axis=-1) + dec_v = model.decoder(XY).numpy().reshape((bins[0] * bins[1], 13)) + lXY = model.scaler.scaler.inverse_transform(dec_v).reshape((bins[0], bins[1], 13)) + # Hard Limits + non_negative_mask = np.all(lXY > 0, axis=-1) + solidity_mask = np.abs(lXY[:, :, 6]) <= 1 + extent_upper_bound_mask = lXY[:, :, 7] <= 1 + # Soft Extremas + area_max_mask = lXY[:, :, 4] < area_max_threshold + perimeter_max_mask = lXY[:, :, 9] < perimeter_max_threshold + area_min_mask = lXY[:, :, 4] > area_min_threshold + perimeter_min_mask = lXY[:, :, 9] > perimeter_min_threshold + # Self-Consistency + man_solidity_mask = np.abs(lXY[:, :, 0] / lXY[:, :, 4]) <= 1 + equivalent_diameter_mask = ( + np.abs(lXY[:, :, 5] - np.sqrt(4 * np.abs(lXY[:, :, 0]) / np.pi)) + < equivdiameter_threshold + ) + convex_area_mask = lXY[:, :, 0] < lXY[:, :, 4] + area_threshold + convex_perimeter_mask = lXY[:, :, 9] < lXY[:, :, 8] + perimeter_threshold + mask_info = { + "non-negative": non_negative_mask, + "solidity": solidity_mask, + "extent-max": extent_upper_bound_mask, + # + "area-max": area_max_mask, + "perimeter-max": perimeter_max_mask, + "area-min": area_min_mask, + "perimeter-min": perimeter_min_mask, + # + "computed-solidity": man_solidity_mask, + "equivalent-diameter": equivalent_diameter_mask, + "convex-area": convex_area_mask, + "convex-perimeter": convex_perimeter_mask, + } + if not consistency_check: + mask_info = { + "non-negative": non_negative_mask, + "solidity": solidity_mask, + "extent-max": extent_upper_bound_mask, + # + "area-max": area_max_mask, + "perimeter-max": perimeter_max_mask, + "area-min": area_min_mask, + "perimeter-min": perimeter_min_mask, + } + mask_list = [mask_info[key] for key in mask_info.keys()] + return np.all(mask_list, axis=0), X, Y, mask_info + + +def excavate(input_2d_array): + """# Return Boundaries for Masked Array + + Use X, Y directions only""" + from copy import deepcopy as dc + from numpy import nan_to_num, zeros_like, abs + + data_2d_array = dc(input_2d_array) + data_2d_array = nan_to_num(data_2d_array, nan=20) + # X-Gradient + x_grad = zeros_like(data_2d_array) + x_grad[:-1, :] = data_2d_array[1:, :] - data_2d_array[:-1, :] + x_grad[(abs(x_grad) > 10)] = 10 + x_grad[(abs(x_grad) < 10) & (abs(x_grad) > 0)] = 1 + x_grad[x_grad == 1] = 0.5 + x_grad[x_grad > 1] = 1 + # Y-Gradient + y_grad = zeros_like(data_2d_array) + y_grad[:, :-1] = data_2d_array[:, 1:] - data_2d_array[:, :-1] + y_grad[(abs(y_grad) > 10)] = 10 + y_grad[(abs(y_grad) < 10) & (abs(y_grad) > 0)] = 1 + y_grad[y_grad == 1] = 0.5 + y_grad[y_grad > 1] = 1 + return x_grad, y_grad + + +def excavate_extra(input_2d_array, N=1): + """# Return Boundaries for Masked Array + + Use all 8 directions""" + from copy import deepcopy as dc + from numpy import nan_to_num, zeros_like, abs + + data_2d_array = dc(input_2d_array) + data_2d_array = nan_to_num(data_2d_array, nan=20) + # X-Gradient + x_grad = zeros_like(data_2d_array) + x_grad[:-N, :] = data_2d_array[N:, :] - data_2d_array[:-N, :] + x_grad[(abs(x_grad) > 10)] = 10 + x_grad[(abs(x_grad) < 10) & (abs(x_grad) > 0)] = 1 + x_grad[x_grad == 1] = 0.5 + x_grad[x_grad > 1] = 1 + # Y-Gradient + y_grad = zeros_like(data_2d_array) + y_grad[:, :-N] = data_2d_array[:, N:] - data_2d_array[:, :-N] + y_grad[(abs(y_grad) > 10)] = 10 + y_grad[(abs(y_grad) < 10) & (abs(y_grad) > 0)] = 1 + y_grad[y_grad == 1] = 0.5 + y_grad[y_grad > 1] = 1 + # XY-Gradient + xy_grad = zeros_like(data_2d_array) + xy_grad[:-N, :-N] = data_2d_array[N:, N:] - data_2d_array[:-N, :-N] + xy_grad[(abs(xy_grad) > 10)] = 10 + xy_grad[(abs(xy_grad) < 10) & (abs(xy_grad) > 0)] = 1 + xy_grad[xy_grad == 1] = 0.5 + xy_grad[xy_grad > 1] = 1 + # X(-Y)-Gradient + yx_grad = zeros_like(data_2d_array) + yx_grad[:-N, :-N] = data_2d_array[N:, :-N] - data_2d_array[:-N, N:] + yx_grad[(abs(yx_grad) > 10)] = 10 + yx_grad[(abs(yx_grad) < 10) & (abs(yx_grad) > 0)] = 1 + yx_grad[yx_grad == 1] = 0.5 + yx_grad[yx_grad > 1] = 1 + xyn_grad = dc(yx_grad) + # (-X)Y-Gradient + xny_grad = zeros_like(data_2d_array) + xny_grad[:-N, :-N] = data_2d_array[:-N, N:] - data_2d_array[N:, :-N] + xny_grad[(abs(xy_grad) > 10)] = 10 + xny_grad[(abs(xy_grad) < 10) & (abs(xy_grad) > 0)] = 1 + xny_grad[xy_grad == 1] = 0.5 + xny_grad[xy_grad > 1] = 1 + # (-X)(-Y)-Gradient + xnyn_grad = zeros_like(data_2d_array) + xnyn_grad[:-N, :-N] = data_2d_array[:-N, :-N] - data_2d_array[N:, N:] + xnyn_grad[(abs(yx_grad) > 10)] = 10 + xnyn_grad[(abs(yx_grad) < 10) & (abs(yx_grad) > 0)] = 1 + xnyn_grad[yx_grad == 1] = 0.5 + xnyn_grad[yx_grad > 1] = 1 + return x_grad, y_grad, xy_grad, xyn_grad, xny_grad, xnyn_grad + + +def excavate_outline(arr, thickness=1): + """# Generate Transparency Mask with NaNs""" + from numpy import sum, abs, NaN + + outline = sum(abs(excavate_extra(arr, thickness)), axis=0) + outline[outline == 0] = NaN + outline[outline > 0] = 0 + return outline + + +def get_boundary_outline( + aae_model_object, + pixel_classification_file=None, + include_transition_regions=False, + border_thickness=3, + bin_count=800, + xrange=[-6.5, 6.5], + yrange=[-4.5, 4.5], + threshold=0.75, +): + """# Get Boundary Outlines""" + from copy import deepcopy + import numpy as np + + if pixel_classification_file is None: + pixel_classification_file = "../../extra_data/PhenotypePixels_65x45_800.npy" + base_classification = np.loadtxt(pixel_classification_file) + base_classification = base_classification.reshape((bin_count, bin_count, 4)) + max_classification_probability = np.zeros((bin_count, bin_count, 1)) + max_classification_probability[:, :, 0] = ( + np.max(base_classification, axis=-1) < threshold + ) + classes_with_include_transition_regions = np.concatenate( + [base_classification, max_classification_probability], axis=-1 + ) + if include_transition_regions: + phenotype_probabilities = deepcopy( + np.argsort(classes_with_include_transition_regions[:, :, :], axis=-1)[ + :, :, -1 + ] + ).astype(np.float32) + else: + phenotype_probabilities = deepcopy( + np.argsort(classes_with_include_transition_regions[:, :, :-1], axis=-1)[ + :, :, -1 + ] + ).astype(np.float32) + nonphysical_mask, _, _, _ = get_nonphysical_masks( + aae_model_object, xrange=xrange, yrange=yrange, bins=[bin_count, bin_count] + ) + nonphysical_mask = nonphysical_mask.astype(np.float32) + nonphysical_mask[nonphysical_mask == 0] = np.NaN + nonphysical_mask[nonphysical_mask == 1] = 0 + nonphysical_mask = nonphysical_mask.T + phenotype_regions = deepcopy(phenotype_probabilities.T + nonphysical_mask.T) + outline = excavate_outline(phenotype_regions, border_thickness) + return outline + + +def apply_boundary( + model_loc=DIR_ROOT + "models/current_model/", + border_thickness=3, + include_transition_regions=False, + threshold=0.7, + alpha=1, + _plt=None, +): + """# Apply Boundary to Plot + + Use Pregenerated Boundary by Default for Speed""" + from ..models import load_aae + from sunlab.common.scaler import MaxAbsScaler + import numpy as np + + if _plt is None: + _plt = plt + if (model_loc == model_loc) and (border_thickness == 3) and (threshold == 0.7): + XYM = np.load(DIR_ROOT + "extra_data/OutlineXYM.npy") + XY = XYM[:2, :, :] + if include_transition_regions: + outline = XYM[3, :, :] + else: + outline = XYM[2, :, :] + _plt.pcolor(XY[0, :, :], XY[1, :, :], outline, cmap="gray", alpha=alpha) + return + model = load_aae(model_loc, MaxAbsScaler) + bin_count = 800 + xrange = [-6.5, 6.5] + yrange = [-4.5, 4.5] + rng = [xrange, yrange] + X = np.linspace(rng[0][0], rng[0][1], bin_count) + Y = np.linspace(rng[1][0], rng[1][1], bin_count) + XY = np.array(np.meshgrid(X, Y)) + kwparams = { + "bin_count": bin_count, + "xrange": xrange, + "yrange": yrange, + } + + include_tregions = include_transition_regions + outline = get_boundary_outline( + model, + border_thickness=border_thickness, + include_transition_regions=include_tregions, + threshold=threshold, + **kwparams + ) + _plt.pcolor(XY[0, :, :], XY[1, :, :], outline, cmap="gray", alpha=alpha) + + +plt.apply_boundary = apply_boundary + + +def plot_shape_dataset(self, model, *args, **kwargs): + """# Plot Shape Dataset""" + if self.labels is None: + plt.scatter2d(model.encoder(self.dataset), *args, **kwargs) + else: + plt.scatter2d(model.encoder(self.dataset), self.labels, *args, **kwargs) + + +ShapeDataset.plot = lambda model, *args, **kwargs: plot_shape_dataset( + model, *args, **kwargs +) |