Commit c34000ab authored by lucas_miranda's avatar lucas_miranda
Browse files

Added docstrings and fixed formatting issues

parent ddf04f84
# @author lucasmiranda42
# encoding: utf-8
# module deepof
"""
Functions and general utilities for the deepof tensorflow models. See documentation for details
"""
from itertools import combinations
from tensorflow.keras import backend as K
......@@ -11,42 +19,6 @@ import tensorflow_probability as tfp
tfd = tfp.distributions
tfpl = tfp.layers
# Connectivity for DLC models
def connect_mouse_topview(animal_id=None) -> nx.Graph:
"""Creates a nx.Graph object with the connectivity of the bodyparts in the
DLC topview model for a single mouse. Used later for angle computing, among others
Parameters:
- animal_id (str): if more than one animal is tagged,
specify the animal identyfier as a string
Returns:
- connectivity (nx.Graph)"""
connectivity = {
"Nose": ["Left_ear", "Right_ear", "Spine_1"],
"Left_ear": ["Right_ear", "Spine_1"],
"Right_ear": ["Spine_1"],
"Spine_1": ["Center", "Left_fhip", "Right_fhip"],
"Center": ["Left_fhip", "Right_fhip", "Spine_2", "Left_bhip", "Right_bhip"],
"Spine_2": ["Left_bhip", "Right_bhip", "Tail_base"],
"Tail_base": ["Tail_1", "Left_bhip", "Right_bhip"],
"Tail_1": ["Tail_2"],
"Tail_2": ["Tail_tip"],
}
connectivity = nx.Graph(connectivity)
if animal_id:
mapping = {
node: "{}_{}".format(animal_id, node) for node in connectivity.nodes()
}
nx.relabel_nodes(connectivity, mapping, copy=False)
return connectivity
# Helper functions
@tf.function
def far_away_uniform_initialiser(shape, minval=0, maxval=15, iters=100000):
......
# @author lucasmiranda42
# encoding: utf-8
# module deepof
"""
Data structures for preprocessing and wrangling of DLC output data.
- project: initial structure for specifying the characteristics of the project.
- coordinates: result of running the project. In charge of calling all relevant
computations for getting the data into the desired shape
- table_dict: python dict subclass for storing experimental instances as pandas.DataFrames.
Contains methods for generating training and test sets ready for model training.
"""
import warnings
from collections import defaultdict
from copy import deepcopy
from deepof.utils import *
from deepof.visuals import *
from pandas_profiling import ProfileReport
from sklearn import random_projection
from sklearn.decomposition import KernelPCA
from sklearn.manifold import TSNE
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import os
import warnings
import networkx as nx
from deepof.utils import *
from deepof.visuals import *
from deepof.model_utils import connect_mouse_topview
# DEFINE CUSTOM ANNOTATED TYPES #
Coordinates = NewType("Coordinates", Any)
Table_dict = NewType("Table_dict", Any)
# CLASSES FOR PREPROCESSING AND DATA WRANGLING
class project:
"""
Class for loading and preprocessing DLC data of individual and social mice.
Class for loading and preprocessing DLC data of individual and multiple animals. All main computations are called
here.
"""
def __init__(
self,
video_format=".mp4",
table_format=".h5",
path=".",
exp_conditions=None,
subset_condition=None,
arena="circular",
smooth_alpha=0.1,
arena_dims=(1,),
distances="All",
ego=False,
angles=True,
model="mouse_topview",
video_format: str = ".mp4",
table_format: str = ".h5",
path: str = ".",
exp_conditions: dict = None,
subset_condition: list = None,
arena: str = "circular",
smooth_alpha: float = 0.1,
arena_dims: tuple = (1,),
distances: str = "All",
ego: str = False,
angles: bool = True,
model: str = "mouse_topview",
):
self.path = path
......@@ -71,7 +89,7 @@ class project:
else:
return "DLC analysis of {} videos".format(len(self.videos))
def load_tables(self, verbose=False):
def load_tables(self, verbose: bool = False) -> Tuple:
"""Loads videos and tables into dictionaries"""
if self.table_format not in [".h5", ".csv"]:
......@@ -163,7 +181,7 @@ class project:
return tab_dict, lik_dict
@property
def get_scale(self):
def get_scale(self) -> np.array:
"""Returns the arena as recognised from the videos"""
if self.arena in ["circular"]:
......@@ -180,7 +198,7 @@ class project:
)[0]
* 2
)
+ self.arena_dims
+ list(self.arena_dims)
)
else:
......@@ -188,8 +206,8 @@ class project:
return np.array(scales)
def get_distances(self, tab_dict, verbose=False):
"""Computes the distances between all selected bodyparts over time.
def get_distances(self, tab_dict: dict, verbose: bool = False) -> dict:
"""Computes the distances between all selected body parts over time.
If ego is provided, it only returns distances to a specified bodypart"""
if verbose:
......@@ -223,7 +241,7 @@ class project:
return distance_dict
def get_angles(self, tab_dict, verbose):
def get_angles(self, tab_dict: dict, verbose: bool = False) -> dict:
"""
Computes all the angles between adjacent bodypart trios per video and per frame in the data.
......@@ -262,7 +280,7 @@ class project:
return angle_dict
def run(self, verbose=False):
def run(self, verbose: bool = False) -> Coordinates:
"""Generates a dataset using all the options specified during initialization"""
tables, quality = self.load_tables(verbose)
......@@ -292,17 +310,24 @@ class project:
class coordinates:
"""
Class for storing the results of a ran project. Methods are mostly setters and getters in charge of tidying up
the generated tables. For internal usage only.
"""
def __init__(
self,
tables,
videos,
arena,
arena_dims,
scales,
quality,
exp_conditions=None,
distances=None,
angles=None,
tables: dict,
videos: list,
arena: str,
arena_dims: np.array,
scales: np.array,
quality: dict,
exp_conditions: dict = None,
distances: dict = None,
angles: dict = None,
):
self._tables = tables
self.distances = distances
......@@ -323,8 +348,32 @@ class coordinates:
return "DLC analysis of {} videos".format(len(self._videos))
def get_coords(
self, center="arena", polar=False, speed=0, length=None, align=False
):
self,
center: str = "arena",
polar: bool = False,
speed: int = 0,
length: str = None,
align: bool = False,
) -> Table_dict:
"""
Returns a table_dict object with the coordinates of each animal as values.
Parameters:
- center (str): name of the body part to which the positions will be centered.
If false, the raw data is returned; if 'arena' (default), coordinates are
centered in the pitch
- polar (bool): states whether the coordinates should be converted to polar values
- speed (int): states the derivative of the positions to report. Speed is returned if 1,
acceleration if 2, jerk if 3, etc.
- length (str): length of the video in a datetime compatible format (hh::mm:ss). If stated, the index
of the stored dataframes will reflect the actual timing in the video.
- align (bool): selects the body part to which later processes will align the frames with
(see preprocess in table_dict documentation).
Returns:
tab_dict (Table_dict): table_dict object containing all the computed information
"""
tabs = deepcopy(self._tables)
if polar:
......@@ -391,7 +440,7 @@ class coordinates:
for key, tab in tabs.items():
tabs[key].index = pd.timedelta_range(
"00:00:00", length, periods=tab.shape[0] + 1, closed="left"
)
).astype('timedelta64[s]')
if align:
assert (
......@@ -416,7 +465,19 @@ class coordinates:
polar=polar,
)
def get_distances(self, speed=0, length=None):
def get_distances(self, speed: int = 0, length: str = None) -> Table_dict:
"""
Returns a table_dict object with the distances between body parts animal as values.
Parameters:
- speed (int): states the derivative of the positions to report. Speed is returned if 1,
acceleration if 2, jerk if 3, etc.
- length (str): length of the video in a datetime compatible format (hh::mm:ss). If stated, the index
of the stored dataframes will reflect the actual timing in the video.
Returns:
tab_dict (Table_dict): table_dict object containing all the computed information
"""
tabs = deepcopy(self.distances)
......@@ -431,7 +492,7 @@ class coordinates:
for key, tab in tabs.items():
tabs[key].index = pd.timedelta_range(
"00:00:00", length, periods=tab.shape[0] + 1, closed="left"
)
).astype('timedelta64[s]')
return table_dict(tabs, typ="dists")
......@@ -439,7 +500,22 @@ class coordinates:
"Distances not computed. Read the documentation for more details"
)
def get_angles(self, degrees=False, speed=0, length=None):
def get_angles(
self, degrees: bool = False, speed: int = 0, length: str = None
) -> Table_dict:
"""
Returns a table_dict object with the angles between body parts animal as values.
Parameters:
- angles (bool): if True, returns the angles in degrees. Radians (default) are returned otherwise.
- speed (int): states the derivative of the positions to report. Speed is returned if 1,
acceleration if 2, jerk if 3, etc.
- length (str): length of the video in a datetime compatible format (hh::mm:ss). If stated, the index
of the stored dataframes will reflect the actual timing in the video.
Returns:
tab_dict (Table_dict): table_dict object containing all the computed information
"""
tabs = deepcopy(self.angles)
......@@ -456,13 +532,15 @@ class coordinates:
for key, tab in tabs.items():
tabs[key].index = pd.timedelta_range(
"00:00:00", length, periods=tab.shape[0] + 1, closed="left"
)
).astype('timedelta64[s]')
return table_dict(tabs, typ="angles")
raise ValueError("Angles not computed. Read the documentation for more details")
def get_videos(self, play=False):
def get_videos(self, play: bool = False):
"""Retuens the videos associated with the dataset as a list."""
if play:
raise NotImplementedError
......@@ -470,9 +548,13 @@ class coordinates:
@property
def get_exp_conditions(self):
"""Returns the stored dictionary with experimental conditions per subject"""
return self._exp_conditions
def get_quality(self, report=False):
def get_quality(self, report: bool = False):
"""Retrieves a dictionary with the tagging quality per video, as reported by DLC"""
if report:
profile = ProfileReport(
self._quality[report],
......@@ -484,11 +566,31 @@ class coordinates:
@property
def get_arenas(self):
"""Retrieves all available information associated with the arena"""
return self._arena, self._arena_dims, self._scales
def rule_based_annotation(self):
pass
class table_dict(dict):
def __init__(self, tabs, typ, arena=None, arena_dims=None, center=None, polar=None):
"""
Main class for storing a single dataset as a dictionary with individuals as keys and pandas.DataFrames as values.
Includes methods for generating training and testing datasets for the autoencoders.
"""
def __init__(
self,
tabs: Coordinates,
typ: str,
arena: str = None,
arena_dims: np.array = None,
center: str = None,
polar: bool = None,
):
super().__init__(tabs)
self._type = typ
self._center = center
......@@ -496,7 +598,7 @@ class table_dict(dict):
self._arena = arena
self._arena_dims = arena_dims
def filter(self, keys):
def filter(self, keys: list) -> Table_dict:
"""Returns a subset of the original table_dict object, containing only the specified keys. Useful, for example,
for selecting data coming from videos of a specified condition."""
......@@ -507,7 +609,10 @@ class table_dict(dict):
)
# noinspection PyTypeChecker
def plot_heatmaps(self, bodyparts, save=False, i=0):
def plot_heatmaps(
self, bodyparts: list, save: bool = False, i: int = 0
) -> plt.figure:
"""Plots heatmaps of the specified body parts (bodyparts) of the specified animal (i)"""
if self._type != "coords" or self._polar:
raise NotImplementedError(
......@@ -533,7 +638,9 @@ class table_dict(dict):
list(self.values())[i], bodyparts, xlim=x_lim, ylim=y_lim, save=save,
)
def get_training_set(self, test_videos=0):
def get_training_set(self, test_videos: int = 0) -> Tuple[np.ndarray, np.ndarray]:
"""Generates training and test sets as numpy.array objects for model training"""
rmax = max([i.shape[0] for i in self.values()])
raw_data = np.array(
[np.pad(v, ((0, rmax - v.shape[0]), (0, 0))) for v in self.values()]
......@@ -553,19 +660,48 @@ class table_dict(dict):
# noinspection PyTypeChecker,PyGlobalUndefined
def preprocess(
self,
window_size=1,
window_step=1,
scale="standard",
test_videos=0,
verbose=False,
conv_filter=None,
sigma=1.0,
shift=0.0,
shuffle=False,
align=False,
):
"""Builds a sliding window. If specified, splits train and test and
Z-scores the data using sklearn's standard scaler"""
window_size: int = 1,
window_step: int = 1,
scale: str = "standard",
test_videos: int = 0,
verbose: bool = False,
conv_filter: bool = None,
sigma: float = 1.0,
shift: float = 0.0,
shuffle: bool = False,
align: str = False,
) -> np.ndarray:
"""
Main method for preprocessing the loaded dataset. Capable of returning training
and test sets ready for model training.
Parameters:
- window_size (int): Size of the sliding window to pass through the data to generate training instances
- window_step (int): Step to take when sliding the window. If 1, a true sliding window is used;
if equal to window_size, the data is split into non-overlapping chunks.
- scale (str): Data scaling method. Must be one of 'standard' (default; recommended) and 'minmax'.
- test_videos (int): Number of videos to use when generating the test set.
If 0, no test set is generated (not recommended).
- verbose (bool): prints job information if True
- conv_filter (bool): must be one of None, 'gaussian'. If not None, convolves each instance
with the specified kernel.
- sigma (float): usable only if conv_filter is 'gaussian'. Standard deviation of the kernel to use.
- shift (float): usable only if conv_filter is 'gaussian'. Shift from mean zero of the kernel to use.
- shuffle (bool): Shuffles the data instances if True. In most use cases, it should be True for training
and False for prediction.
- align (bool): If "all", rotates all data instances to align the center -> align (selected before
when calling get_coords) axis with the y-axis of the cartesian plane. If 'center', rotates all instances
using the angle of the central frame of the sliding window. This way rotations of the animal are caught
as well. It doesn't do anything if False.
Returns:
- X_train (np.ndarray): 3d dataset with shape (instances, sliding_window_size, features)
generated from all training videos
- X_test (np.ndarray): 3d dataset with shape (instances, sliding_window_size, features)
generated from all test videos (if test_videos > 0)
"""
global g
X_train, X_test = self.get_training_set(test_videos)
......@@ -651,7 +787,12 @@ class table_dict(dict):
return X_train
def random_projection(self, n_components=None, sample=1000):
def random_projection(
self, n_components: int = None, sample: int = 1000
) -> Tuple[Any, Any]:
"""Returns a training set generated from the 2D original data (time x features) and a random projection
to a n_components space. The sample parameter allows the user to randomly pick a subset of the data for
performance or visualization reasons"""
X = self.get_training_set()[0]
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
......@@ -661,7 +802,12 @@ class table_dict(dict):
return X, rproj
def pca(self, n_components=None, sample=1000, kernel="linear"):
def pca(
self, n_components: int = None, sample: int = 1000, kernel: str = "linear"
) -> Tuple[Any, Any]:
"""Returns a training set generated from the 2D original data (time x features) and a PCA projection
to a n_components space. The sample parameter allows the user to randomly pick a subset of the data for
performance or visualization reasons"""
X = self.get_training_set()[0]
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
......@@ -671,7 +817,12 @@ class table_dict(dict):
return X, pca
def tsne(self, n_components=None, sample=1000, perplexity=30):
def tsne(
self, n_components: int = None, sample: int = 1000, perplexity: int = 30
) -> Tuple[Any, Any]:
"""Returns a training set generated from the 2D original data (time x features) and a PCA projection
to a n_components space. The sample parameter allows the user to randomly pick a subset of the data for
performance or visualization reasons"""
X = self.get_training_set()[0]
X = X[np.random.choice(X.shape[0], sample, replace=False), :]
......@@ -709,3 +860,6 @@ def merge_tables(*args):
# - Generate ragged training array using a metric (acceleration, maybe?)
# - Use something like Dynamic Time Warping to put all instances in the same length
# - add rule_based_annotation method to coordinates class!!
# - with the current implementation, preprocess can't fully work on merged table_dict instances.
# While some operations (mainly alignment) should be carried out before merging, others require
# the whole dataset to function properly.
# @author lucasmiranda42
# encoding: utf-8
# module deepof
"""
Functions and general utilities for the deepof package. See documentation for details
"""
import cv2
import matplotlib.pyplot as plt
......@@ -24,6 +32,43 @@ from typing import Tuple, Any, List, Union, NewType
Coordinates = NewType("Coordinates", Any)
# CONNECTIVITY FOR DLC MODELS
def connect_mouse_topview(animal_id=None) -> nx.Graph:
"""Creates a nx.Graph object with the connectivity of the bodyparts in the
DLC topview model for a single mouse. Used later for angle computing, among others
Parameters:
- animal_id (str): if more than one animal is tagged,
specify the animal identyfier as a string
Returns:
- connectivity (nx.Graph)"""
connectivity = {
"Nose": ["Left_ear", "Right_ear", "Spine_1"],
"Left_ear": ["Right_ear", "Spine_1"],
"Right_ear": ["Spine_1"],
"Spine_1": ["Center", "Left_fhip", "Right_fhip"],
"Center": ["Left_fhip", "Right_fhip", "Spine_2", "Left_bhip", "Right_bhip"],
"Spine_2": ["Left_bhip", "Right_bhip", "Tail_base"],
"Tail_base": ["Tail_1", "Left_bhip", "Right_bhip"],
"Tail_1": ["Tail_2"],
"Tail_2": ["Tail_tip"],
}
connectivity = nx.Graph(connectivity)
if animal_id:
mapping = {
node: "{}_{}".format(animal_id, node) for node in connectivity.nodes()
}
nx.relabel_nodes(connectivity, mapping, copy=False)
return connectivity
# QUALITY CONTROL AND PREPROCESSING #
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment