Commit 29d117c2 authored by lucas_miranda's avatar lucas_miranda
Browse files

Added speed computing tests

parent ca2088d2
......@@ -90,8 +90,10 @@ def compute_dist(
- result (pd.DataFrame): pandas.DataFrame with the
absolute distances between a pair of body parts"""
a, b = pair_array[:, :2], pair_array[:, 2:]
lim = 2 if pair_array.shape[1] == 4 else 1
a, b = pair_array[:, :lim], pair_array[:, lim:]
ab = a - b
dist = np.sqrt(np.einsum("...i,...i", ab, ab))
return pd.DataFrame(dist * arena_abs / arena_rel)
......@@ -199,18 +201,21 @@ def align_trajectories(data: np.array, mode: str = "all") -> np.array:
center_time = (data.shape[1] - 1) // 2
angles = np.arctan2(data[:, center_time, 0], data[:, center_time, 1])
elif mode == "all":
data = data.reshape(-1, dshape[-1])
data = data.reshape(-1, dshape[-1], order="C")
angles = np.arctan2(data[:, 0], data[:, 1])
elif mode == "none":
data = data.reshape(-1, dshape[-1], order="C")
angles = np.zeros(data.shape[0])
aligned_trajs = np.zeros(data.shape)
for frame in range(data.shape[0]):
aligned_trajs[frame] = rotate(
data[frame].reshape([-1, 2]), angles[frame],
).reshape(data.shape[1:])
data[frame].reshape([-1, 2], order="C"), angles[frame],
).reshape(data.shape[1:], order="C")
if mode == "all":
aligned_trajs = aligned_trajs.reshape(dshape)
if mode == "all" or mode == "none":
aligned_trajs = aligned_trajs.reshape(dshape, order="C")
return aligned_trajs
......@@ -436,33 +441,52 @@ def climb_wall(
return climbing
def rolling_speed(dframe, typ, pause=10, rounds=5, order=1):
"""Returns the average speed over 10 frames in pixels per frame"""
s = dframe.shape[0]
if typ == "coords":
bp = dframe.shape[1] / 2 if order == 1 else dframe.shape[1]
d = 2 if order == 1 else 1
else:
bp = dframe.shape[1]
d = 1
def rolling_speed(
dframe: pd.DatetimeIndex, window: int = 10, rounds: int = 10, deriv: int = 1
) -> pd.DataFrame:
"""Returns the average speed over n frames in pixels per frame
Parameters:
- dframe (pandas.DataFrame): position over time dataframe
- pause (int): frame-length of the averaging window
- rounds (int): float rounding decimals
- deriv (int): position derivative order; 1 for speed,
2 for acceleration, 3 for jerk, etc
distances = np.linalg.norm(
np.array(dframe).reshape(s, int(bp), d)
- np.array(dframe.shift()).reshape(s, int(bp), d),
axis=2,
Returns:
- speeds (pd.DataFrame): containing 2D speeds for each body part
in the original data or their consequent derivatives"""
original_shape = dframe.shape
body_parts = dframe.columns.levels[0]
speeds = pd.DataFrame
for der in range(deriv):
distances = np.concatenate(
[
np.array(dframe).reshape([-1, (2 if der == 0 else 1)], order="F"),
np.array(dframe.shift()).reshape(
[-1, (2 if der == 0 else 1)], order="F"
),
],
axis=1,
)
distances = np.array(compute_dist(distances))
distances = distances.reshape(
[original_shape[0], original_shape[1] // 2], order="F"
)
distances = pd.DataFrame(distances, index=dframe.index)
speeds = np.round(distances.rolling(pause).mean(), rounds)
speeds = np.round(distances.rolling(window).mean(), rounds)
speeds[np.isnan(speeds)] = 0.0
dframe = speeds
speeds.columns = body_parts
return speeds
def huddle(pos_dict, fnum, tol, tol2, mouse="B"):
def huddle(pos_dict, tol, tol2, mouse="B"):
"""Returns true when the specified mouse is huddling"""
return (
......@@ -700,8 +724,8 @@ def Tag_video(
)
# Compute speed on a rolling window
tagdict["bspeed"] = rolling_speed(dframe["B_Center"], pause=speedpause)
tagdict["wspeed"] = rolling_speed(dframe["W_Center"], pause=speedpause)
tagdict["bspeed"] = rolling_speed(dframe["B_Center"], window=speedpause)
tagdict["wspeed"] = rolling_speed(dframe["W_Center"], window=speedpause)
if any([show, save]):
# Loop over the frames in the video
......
This diff is collapsed.
......@@ -10,6 +10,12 @@ from deepof.utils import *
import deepof.preprocess
import pytest
# AUXILIARY FUNCTIONS #
def autocorr(x, t=1):
return np.round(np.corrcoef(np.array([x[:-t], x[t:]]))[0, 1], 5)
# QUALITY CONTROL AND PREPROCESSING #
......@@ -215,6 +221,7 @@ def test_angle_trio(array):
def test_rotate(p):
assert np.allclose(rotate(p, 2 * np.pi), p)
assert np.allclose(rotate(p, np.pi), -p)
assert np.allclose(rotate(p, 0), p)
@settings(deadline=None)
......@@ -230,16 +237,18 @@ def test_rotate(p):
min_value=1, max_value=10, allow_nan=False, allow_infinity=False
),
),
mode_idx=st.integers(min_value=0, max_value=1),
mode_idx=st.integers(min_value=0, max_value=2),
)
def test_align_trajectories(data, mode_idx):
mode = ["center", "all"][mode_idx]
mode = ["center", "all", "none"][mode_idx]
aligned = align_trajectories(data, mode)
assert aligned.shape == data.shape
if mode == "center":
assert np.allclose(aligned[:, (data.shape[1] - 1) // 2, 0], 0)
elif mode == "all":
assert np.allclose(aligned[:, :, 0], 0)
elif mode == "none":
assert np.allclose(aligned, data)
@settings(deadline=None)
......@@ -305,9 +314,6 @@ def test_smooth_mult_trajectory(alpha, series):
smoothed1 = smooth_mult_trajectory(series, alpha1)
smoothed2 = smooth_mult_trajectory(series, alpha2)
def autocorr(x, t=1):
return np.round(np.corrcoef(np.array([x[:-t], x[t:]]))[0, 1], 5)
assert autocorr(smoothed1) >= autocorr(series)
assert autocorr(smoothed2) >= autocorr(series)
assert autocorr(smoothed2) <= autocorr(smoothed1)
......@@ -347,14 +353,14 @@ def test_close_single_contact(pos_dframe, tol):
index=range_indexes(min_size=5),
columns=columns(["X1", "y1", "X2", "y2", "X3", "y3", "X4", "y4"], dtype=float),
rows=st.tuples(
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10, allow_nan=False, allow_infinity=False),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
),
),
tol=st.floats(min_value=0.01, max_value=4.98),
......@@ -422,3 +428,43 @@ def test_climb_wall(arena, tol):
with pytest.raises(NotImplementedError):
climb_wall("", arena, prun["test"], tol1, nose="Nose")
@settings(deadline=None)
@given(
dframe=data_frames(
index=range_indexes(min_size=50),
columns=columns(["X1", "y1", "X2", "y2"], dtype=float),
rows=st.tuples(
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
st.floats(min_value=1, max_value=10),
),
),
sampler=st.data(),
)
def test_rolling_speed(dframe, sampler):
dframe *= np.random.uniform(0, 1, dframe.shape)
order1 = sampler.draw(st.integers(min_value=1, max_value=3))
order2 = sampler.draw(st.integers(min_value=order1, max_value=3))
window2 = sampler.draw(st.integers(min_value=10, max_value=25))
idx = pd.MultiIndex.from_product(
[["bpart1", "bpart2"], ["X", "y"]], names=["bodyparts", "coords"],
)
dframe.columns = idx
speeds1 = rolling_speed(dframe, 5, 10, order1)
speeds2 = rolling_speed(dframe, 5, 10, order2)
speeds3 = rolling_speed(dframe, window2, 10, order1)
assert speeds1.shape[0] == dframe.shape[0]
assert speeds1.shape[1] == dframe.shape[1] // 2
assert np.all(np.std(speeds1) >= np.std(speeds2))
for i in range(speeds1.shape[1]):
assert autocorr(np.array(speeds1.iloc[:, i])) <= autocorr(
np.array(speeds3.iloc[:, i])
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment