Commit f3de9525 authored by lucas_miranda's avatar lucas_miranda
Browse files

Added tests for gmm functions in utils.py

parent 6dac3405
......@@ -15,7 +15,8 @@ from joblib import Parallel, delayed
from scipy import spatial
from scipy import stats
from sklearn import mixture
from tqdm import tqdm_notebook as tqdm
from tqdm import tqdm
from typing import Tuple, Any, List, Union
# QUALITY CONTROL AND PREPROCESSING #
......@@ -977,9 +978,9 @@ def max_behaviour(
the maximum behaviour per time slot
- stepped (bool): sliding windows don't overlap if True. False by default
Returns:
- max_array (np.array): string array with the most common behaviour per instance
of the sliding window"""
Returns:
- max_array (np.array): string array with the most common behaviour per instance
of the sliding window"""
speeds = [col for col in behaviour_dframe.columns if "speed" in col.lower()]
......@@ -995,7 +996,19 @@ def max_behaviour(
# MACHINE LEARNING FUNCTIONS #
def gmm_compute(x, n_components, cv_type):
def gmm_compute(x: np.array, n_components: int, cv_type: str) -> list:
"""Fits a Gaussian Mixture Model to the provided data and returns evaluation metrics.
Parameters:
- x (numpy.array): data matrix to train the model
- n_components (int): number of Gaussian components to use
- cv_type (str): covariance matrix type to use.
Must be one of "spherical", "tied", "diag", "full"
Returns:
- gmm_eval (list): model and associated BIC for downstream selection
"""
gmm = mixture.GaussianMixture(
n_components=n_components,
covariance_type=cv_type,
......@@ -1003,19 +1016,36 @@ def gmm_compute(x, n_components, cv_type):
init_params="kmeans",
)
gmm.fit(x)
return [gmm, gmm.bic(x)]
gmm_eval = [gmm, gmm.bic(x)]
return gmm_eval
def gmm_model_selection(
x: np.array,
n_components_range: range,
part_size: int,
n_runs: int = 100,
n_cores: int = False,
cv_types: Tuple = ("spherical", "tied", "diag", "full"),
) -> Tuple[List[list], List[np.ndarray], Union[int, Any]]:
"""Runs GMM clustering model selection on the specified X dataframe, outputs the bic distribution per model,
a vector with the median BICs and an object with the overall best model
Parameters:
- x (numpy.array): data matrix to train the models
- n_components_range (range): generator with numbers of components to evaluate
- n_runs (int): number of bootstraps for each model
- part_size (int): size of bootstrap samples for each model
- n_cores (int): number of cores to use for computation
- cv_types (tuple): Covariance Matrices to try. All four available by default
def GMM_Model_Selection(
X,
n_components_range,
n_runs=100,
part_size=10000,
n_cores=False,
cv_types=["spherical", "tied", "diag", "full"],
):
"""Runs GMM clustering model selection on the specified X dataframe, outputs the bic distribution per model,
a vector with the median BICs and an object with the overall best model"""
Returns:
- bic (list): All recorded BIC values for all attempted parameter combinations
(useful for plotting)
- m_bic(list): All minimum BIC values recorded throughout the process
(useful for plottinh)
- best_bic_gmm (sklearn.GMM): unfitted version of the best found model
"""
# Set the default of n_cores to the most efficient value
if not n_cores:
......@@ -1024,6 +1054,7 @@ def GMM_Model_Selection(
bic = []
m_bic = []
lowest_bic = np.inf
best_bic_gmm = 0
pbar = tqdm(total=len(cv_types) * len(n_components_range))
......@@ -1032,8 +1063,10 @@ def GMM_Model_Selection(
for n_components in n_components_range:
res = Parallel(n_jobs=n_cores, prefer="threads")(
delayed(gmm_compute)(X.sample(part_size), n_components, cv_type)
for i in range(n_runs)
delayed(gmm_compute)(
x.sample(part_size, replace=True), n_components, cv_type
)
for _ in range(n_runs)
)
bic.append([i[1] for i in res])
......@@ -1045,7 +1078,8 @@ def GMM_Model_Selection(
return bic, m_bic, best_bic_gmm
##### RESULT ANALYSIS FUNCTIONS #####
# RESULT ANALYSIS FUNCTIONS #
def cluster_transition_matrix(
......@@ -1088,7 +1122,8 @@ def cluster_transition_matrix(
return trans_normed
##### PLOTTING FUNCTIONS #####
# PLOTTING FUNCTIONS #
def plot_speed(Behaviour_dict, Treatments):
......
......@@ -632,18 +632,67 @@ def test_single_behaviour_analysis(sampler):
),
),
window_size=st.data(),
stepped=st.booleans(),
)
def test_max_behaviour(behaviour_dframe, window_size):
def test_max_behaviour(behaviour_dframe, window_size, stepped):
wsize1 = window_size.draw(st.integers(min_value=5, max_value=50))
wsize2 = window_size.draw(st.integers(min_value=wsize1, max_value=50))
maxbe1 = max_behaviour(behaviour_dframe, wsize1)
maxbe2 = max_behaviour(behaviour_dframe, wsize2)
maxbe1 = max_behaviour(behaviour_dframe, wsize1, stepped)
maxbe2 = max_behaviour(behaviour_dframe, wsize2, stepped)
assert type(maxbe1) == np.ndarray
assert type(maxbe2) == np.ndarray
assert type(maxbe1[wsize1 // 2 + 1]) == str
assert type(maxbe1[wsize2 // 2 + 1]) == str
assert maxbe1[wsize1 // 2 + 1] in behaviour_dframe.columns
assert maxbe2[wsize2 // 2 + 1] in behaviour_dframe.columns
assert len(maxbe1) >= len(maxbe2)
if not stepped:
assert type(maxbe1[wsize1 // 2 + 1]) == str
assert type(maxbe1[wsize2 // 2 + 1]) == str
assert maxbe1[wsize1 // 2 + 1] in behaviour_dframe.columns
assert maxbe2[wsize2 // 2 + 1] in behaviour_dframe.columns
assert len(maxbe1) >= len(maxbe2)
@settings(
deadline=None, suppress_health_check=[HealthCheck.too_slow],
)
@given(
x=arrays(
dtype=float,
shape=st.tuples(
st.integers(min_value=10, max_value=1000),
st.integers(min_value=10, max_value=1000),
),
elements=st.floats(min_value=1.0, max_value=1.0,),
).map(lambda x: x * np.random.uniform(0, 2, x.shape)),
n_components=st.integers(min_value=1, max_value=10),
cv_type=st.integers(min_value=0, max_value=3),
)
def test_gmm_compute(x, n_components, cv_type):
cv_type = ["spherical", "tied", "diag", "full"][cv_type]
assert len(gmm_compute(x, n_components, cv_type)) == 2
@settings(
deadline=None, suppress_health_check=[HealthCheck.too_slow],
)
@given(
x=arrays(
dtype=float,
shape=st.tuples(
st.integers(min_value=10, max_value=1000),
st.integers(min_value=10, max_value=1000),
),
elements=st.floats(min_value=1.0, max_value=1.0,),
).map(lambda x: x * np.random.uniform(0, 2, x.shape)),
sampler=st.data(),
)
def test_gmm_model_selection(x, sampler):
n_component_range = range(1, sampler.draw(st.integers(min_value=2, max_value=5)))
part_size = sampler.draw(
st.integers(min_value=x.shape[0] // 2, max_value=x.shape[0] * 2)
)
assert (
len(
gmm_model_selection(pd.DataFrame(x), n_component_range, part_size, n_runs=1)
)
== 3
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment