Commit 31f3164b authored by lucas_miranda's avatar lucas_miranda
Browse files

Rename project and all files. Creates __init__.py

parent e120b6e7
/DLC_social_1_exp_conditions.pickle
/examples/.ipynb_checkpoints/
This diff is collapsed.
This diff is collapsed.
# __init__ file of the acrona project
from collections import defaultdict
from copy import deepcopy
from pandarallel import pandarallel
from pandas_profiling import ProfileReport
from sklearn import random_projection
from sklearn.decomposition import KernelPCA
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from source.utils import *
from tqdm import tqdm
import os
import warnings
class get_coordinates:
""" Class for loading and preprocessing DLC data of individual and social mice. """
def __init__(
self,
video_format=".mp4",
table_format=".h5",
path=".",
exp_conditions=False,
arena="circular",
arena_dims=[1],
smooth_alpha=0.1,
p=1,
center_coords=True,
distances=False,
ego=False,
):
self.path = path
self.video_path = self.path + "Videos/"
self.table_path = self.path + "Tables/"
self.videos = sorted(
[vid for vid in os.listdir(self.video_path) if vid.endswith(video_format)]
)
self.tables = sorted(
[tab for tab in os.listdir(self.table_path) if tab.endswith(table_format)]
)
self.exp_conditions = exp_conditions
self.table_format = table_format
self.video_format = video_format
self.arena = arena
self.arena_dims = arena_dims
self.smooth_alpha = smooth_alpha
self.p = p
self.center_coords = center_coords
self.distances = distances
self.ego = ego
self.scales = self.get_scale
assert [re.findall("(.*)_", vid)[0] for vid in self.videos] == [
re.findall("(.*)\.", tab)[0] for tab in self.tables
], "Video files should match table files"
def __str__(self):
if self.exp_conditions:
return "DLC analysis of {} videos across {} conditions".format(
len(self.videos), len(self.exp_conditions)
)
else:
return "DLC analysis of {} videos".format(len(self.videos))
def load_tables(self, verbose):
"""Loads videos and tables into dictionaries"""
if verbose:
print("Loading and smoothing trajectories...")
if self.table_format == ".h5":
table_dict = {
re.findall("(.*?)_", tab)[0]: pd.read_hdf(
self.table_path + tab, dtype=float
)
for tab in self.tables
}
elif self.table_format == ".csv":
table_dict = {
re.findall("(.*?)_", tab)[0]: pd.read_csv(
self.table_path + tab, dtype=float
)
for tab in self.tables
}
lik_dict = defaultdict()
for key, value in table_dict.items():
x = value.xs("x", level="coords", axis=1, drop_level=False)
y = value.xs("y", level="coords", axis=1, drop_level=False)
l = value.xs("likelihood", level="coords", axis=1, drop_level=True)
table_dict[key] = pd.concat([x, y], axis=1).sort_index(axis=1)
lik_dict[key] = l
if self.smooth_alpha:
for dframe in tqdm(table_dict.keys()):
table_dict[dframe] = table_dict[dframe].apply(
lambda x: smooth_mult_trajectory(x, alpha=self.smooth_alpha), axis=0
)
for key, tab in table_dict.items():
table_dict[key] = tab[tab.columns.levels[0][0]]
return table_dict, lik_dict
@property
def get_scale(self):
"""Returns the arena as recognised from the videos"""
if self.arena in ["circular"]:
scales = [
list(
recognize_arena(
self.videos,
vid_index,
path=self.video_path,
arena_type=self.arena,
)
* 2
)
+ self.arena_dims
for vid_index, _ in enumerate(self.videos)
]
else:
raise NotImplementedError("arenas must be set to one of: 'circular'")
return np.array(scales)
def get_distances(self, table_dict, verbose=1):
"""Computes the distances between all selected bodyparts over time.
If ego is provided, it only returns distances to a specified bodypart"""
if verbose:
print("Computing distance based coordinates...")
distance_dict = defaultdict()
pandarallel.initialize(nb_workers=self.p, verbose=verbose)
nodes = self.distances
if nodes == "All":
nodes = table_dict[list(table_dict.keys())[0]].columns.levels[0]
assert [
i in list(table_dict.values())[0].columns.levels[0] for i in nodes
], "Nodes should correspond to existent bodyparts"
scales = self.scales[:, 2:]
for ind, key in tqdm(
enumerate(table_dict.keys()), total=len(table_dict.keys())
):
distance_dict[key] = table_dict[key][nodes].parallel_apply(
lambda x: bpart_distance(x, nodes, scales[ind][1], scales[ind][0]),
axis=1,
)
if self.ego:
for key, val in distance_dict.items():
distance_dict[key] = val.loc[
:, [dist for dist in val.columns if self.ego in dist]
]
return distance_dict
def get_angles(self, velocities=0):
"""Computes the angles between all selected bodyparts over time.
If ego is provided, it only returns angles to a specified bodypart"""
def run(self, verbose=1):
"""Generates a dataset using all the options specified during initialization"""
tables, quality = self.load_tables(verbose)
distances = None
if self.distances:
distances = self.get_distances(tables, verbose)
if verbose == 1:
print("Done!")
return coordinates(
tables,
self.videos,
self.arena,
self.arena_dims,
self.scales,
quality,
self.exp_conditions,
distances,
)
class coordinates:
def __init__(
self,
tables,
videos,
arena,
arena_dims,
scales,
quality,
exp_conditions=None,
distances=None,
):
self._tables = tables
self.distances = distances
self._videos = videos
self._exp_conditions = exp_conditions
self._arena = arena
self._arena_dims = arena_dims
self._scales = scales
self._quality = quality
def __str__(self):
if self._exp_conditions:
return "Coordinates of {} videos across {} conditions".format(
len(self._videos), len(self._exp_conditions)
)
else:
return "DLC analysis of {} videos".format(len(self._videos))
def get_coords(self, center=True, polar=False):
tabs = deepcopy(self._tables)
if center:
if self._arena == "circular":
for i, (key, value) in enumerate(tabs.items()):
value.loc[:, (slice("coords"), ["x"])] = value.loc[
:, (slice("coords"), ["x"])
].applymap(lambda x: x - self._scales[i][0] / 2)
value.loc[:, (slice("coords"), ["y"])] = value.loc[
:, (slice("coords"), ["y"])
].applymap(lambda y: y - self._scales[i][1] / 2)
if polar:
for key, tab in tabs.items():
tabs[key] = tab2polar(tab)
return table_dict(
tabs,
"coords",
arena=self._arena,
arena_dims=self._scales,
center=center,
polar=polar,
)
def get_distances(self):
if self.distances != None:
return table_dict(self.distances, typ="dist")
raise ValueError(
"Distances not computed. Read the documentation for more details"
)
def get_videos(self, play=False):
if play:
raise NotImplementedError
return self._videos
@property
def get_exp_conditions(self):
return self._exp_conditions
def get_quality(self, report=False):
if report:
profile = ProfileReport(
self._quality[report],
title="Quality Report, {}".format(report),
html={"style": {"full_width": True}},
)
return profile
return self._quality
@property
def get_arenas(self):
return self._arena, self._arena_dims, self._scales
class table_dict(dict):
def __init__(self, tabs, typ, arena=None, arena_dims=None, center=None, polar=None):
super().__init__(tabs)
self._type = typ
self._center = center
self._polar = polar
self._arena = arena
self._arena_dims = arena_dims
def plot_heatmaps(self, bodyparts, save=False, i=0):
if self._type != "coords" or self._polar:
raise NotImplementedError(
"Heatmaps only available for cartesian coordinates. Set polar to False in get_coordinates and try again"
)
if not self._center:
warnings.warn(
"Heatmaps look better if you center the data. Set center=True in get_coords and rerun this function to give it a try!"
)
if self._arena == "circular":
x_lim = (
[-self._arena_dims[i][2] / 2, self._arena_dims[i][2] / 2]
if self._center
else [0, self._arena_dims[i][0]]
)
y_lim = (
[-self._arena_dims[i][2] / 2, self._arena_dims[i][2] / 2]
if self._center
else [0, self._arena_dims[i][1]]
)
plot_heatmap(
list(self.values())[i], bodyparts, xlim=x_lim, ylim=y_lim, save=save,
)
def get_training_set(self):
rmax = max([i.shape[0] for i in self.values()])
X_train = np.concatenate(
[np.pad(v, ((0, rmax - v.shape[0]), (0, 0))) for v in self.values()]
)
return X_train
def preprocess(
self,
window_size=1,
window_step=1,
scale=True,
test_proportion=0,
random_state=None,
verbose=False,
):
"""Builds a sliding window. If desired, splits train and test and
Z-scores the data using sklearn's standard scaler"""
X_train = self.get_training_set()
if test_proportion:
if verbose:
print("Splitting train and test...")
X_train, X_test = train_test_split(
X_train, test_size=test_proportion, random_state=random_state
)
if scale:
if verbose:
print("Scaling data...")
scaler = StandardScaler()
X_train = scaler.fit_transform(
X_train.reshape(-1, X_train.shape[-1])
).reshape(X_train.shape)
assert np.allclose(np.mean(X_train), 0)
assert np.allclose(np.std(X_train), 1)
if test_proportion:
X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(
X_test.shape
)
if verbose:
print("Done!")
X_train = rolling_window(X_train, window_size, window_step)
if test_proportion:
X_test = rolling_window(X_test, window_size, window_step)
return X_train, X_test
return X_train
def random_projection(self, n_components=None, sample=1000):
X = self.get_training_set()
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
rproj = random_projection.GaussianRandomProjection(n_components=n_components)
X = rproj.fit_transform(X)
return X, rproj
def pca(self, n_components=None, sample=1000, kernel="linear"):
X = self.get_training_set()
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
pca = KernelPCA(n_components=n_components, kernel=kernel)
X = pca.fit_transform(X)
return X, pca
def tSNE(self, n_components=None, sample=1000):
X = self.get_training_set()
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
tsne = TSNE(n_components=n_components)
X = tsne.fit_transform(X)
return X, tsne
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LSTM, Bidirectional
from tensorflow.keras.layers import Dense, Layer
from tensorflow.keras.layers import RepeatVector, Dropout
from tensorflow.keras.layers import TimeDistributed
from tensorflow.keras.constraints import UnitNorm, Constraint
from tensorflow.keras import Sequential
from keras import backend as K
from kerastuner import HyperModel
# Custom layers for efficiency
class DenseTranspose(Layer):
def __init__(self, dense, output_dim, activation=None, **kwargs):
self.dense = dense
self.output_dim = output_dim
self.activation = tf.keras.activations.get(activation)
super().__init__(**kwargs)
def get_config(self):
config = super().get_config().copy()
config.update(
{
"dense": self.dense,
"output_dim": self.output_dim,
"activation": self.activation,
}
)
return config
def build(self, batch_input_shape):
self.biases = self.add_weight(
name="bias", shape=[self.dense.input_shape[-1]], initializer="zeros"
)
super().build(batch_input_shape)
def call(self, inputs):
z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True)
return self.activation(z + self.biases)
def compute_output_shape(self, input_shape):
return (input_shape[0], self.output_dim)
class UncorrelatedFeaturesConstraint(Constraint):
def __init__(self, encoding_dim, weightage=1.0):
self.encoding_dim = encoding_dim
self.weightage = weightage
def get_config(self):
config = super().get_config().copy()
config.update(
{"encoding_dim": self.encoding_dim, "weightage": self.weightage,}
)
return config
def get_covariance(self, x):
x_centered_list = []
for i in range(self.encoding_dim):
x_centered_list.append(x[:, i] - K.mean(x[:, i]))
x_centered = tf.stack(x_centered_list)
covariance = K.dot(x_centered, K.transpose(x_centered)) / tf.cast(
x_centered.get_shape()[0], tf.float32
)
return covariance
# Constraint penalty
def uncorrelated_feature(self, x):
if self.encoding_dim <= 1:
return 0.0
else:
output = K.sum(
K.square(
self.covariance
- tf.math.multiply(self.covariance, K.eye(self.encoding_dim))
)
)
return output
def __call__(self, x):
self.covariance = self.get_covariance(x)
return self.weightage * self.uncorrelated_feature(x)
class SEQ_2_SEQ_AE(HyperModel):
def __init__(self, input_shape):
self.input_shape = input_shape
def build(self, hp):
# Hyperparameters to tune
CONV_filters = hp.Int(
"units_conv", min_value=32, max_value=256, step=32, default=256
)
LSTM_units_1 = hp.Int(
"units_lstm", min_value=128, max_value=512, step=32, default=256
)
LSTM_units_2 = int(LSTM_units_1 / 2)
DENSE_1 = int(LSTM_units_2)
DENSE_2 = hp.Int(
"units_dense1", min_value=32, max_value=256, step=32, default=64
)
DROPOUT_RATE = hp.Float(
"dropout_rate", min_value=0.0, max_value=0.5, default=0.25, step=0.05
)
ENCODING = hp.Int(
"units_dense2", min_value=32, max_value=128, step=32, default=32
)
# Encoder Layers
Model_E0 = tf.keras.layers.Conv1D(
filters=CONV_filters,
kernel_size=5,
strides=1,
padding="causal",
activation="relu",
input_shape=self.input_shape[1:],
)
Model_E1 = Bidirectional(
LSTM(
LSTM_units_1,
activation="tanh",
return_sequences=True,
kernel_constraint=UnitNorm(axis=0),
)
)
Model_E2 = Bidirectional(
LSTM(
LSTM_units_2,
activation="tanh",
return_sequences=False,
kernel_constraint=UnitNorm(axis=0),
)
)
Model_E3 = Dense(DENSE_1, activation="relu", kernel_constraint=UnitNorm(axis=0))
Model_E4 = Dense(DENSE_2, activation="relu", kernel_constraint=UnitNorm(axis=0))
Model_E5 = Dense(
ENCODING,
activation="relu",
kernel_constraint=UnitNorm(axis=1),
activity_regularizer=UncorrelatedFeaturesConstraint(3, weightage=1.0),
)
# Decoder layers
Model_D4 = Bidirectional(
LSTM(
LSTM_units_1,
activation="tanh",
return_sequences=True,
kernel_constraint=UnitNorm(axis=1),
)
)
Model_D5 = Bidirectional(
LSTM(
LSTM_units_1,
activation="sigmoid",
return_sequences=True,
kernel_constraint=UnitNorm(axis=1),
)
)
# Define and instanciate encoder
encoder = Sequential(name="DLC_encoder")
encoder.add(Model_E0)
encoder.add(Model_E1)
encoder.add(Model_E2)
encoder.add(Model_E3)
encoder.add(Dropout(DROPOUT_RATE))
encoder.add(Model_E4)
encoder.add(Model_E5)
# Define and instanciate decoder
decoder = Sequential(name="DLC_Decoder")
decoder.add(
DenseTranspose(
Model_E5, activation="relu", input_shape=(ENCODING,), output_dim=64
)
)
decoder.add(DenseTranspose(Model_E4, activation="relu", output_dim=128))
decoder.add(DenseTranspose(Model_E3, activation="relu", output_dim=256))
decoder.add(RepeatVector(self.input_shape[1]))
decoder.add(Model_D4)
decoder.add(Model_D5)
decoder.add(TimeDistributed(Dense(self.input_shape[2])))
model = Sequential([encoder, decoder], name="DLC_Autoencoder")
model.compile(
loss=tf.keras.losses.Huber(reduction="sum", delta=100.0),
optimizer=Adam(