diff --git a/deepof/pose_utils.py b/deepof/pose_utils.py index 77e1bab228730a0c01360e6e19a4e486148042bc..61702e4e03a26a2a7b2b6a7b24946e3ea71470c4 100644 --- a/deepof/pose_utils.py +++ b/deepof/pose_utils.py @@ -590,8 +590,14 @@ def rule_based_video( """ + # DATA OBTENTION AND PREPARATION + assert mode in [ + "save", + "show", + ], "Parameter 'mode' should be one of 'save' and 'show'. See docs for details" + animal_ids = coordinates._animal_ids - # undercond = "_" if len(animal_ids) > 1 else "" + undercond = "_" if len(animal_ids) > 1 else "" vid_name = re.findall("(.*?)_", tracks[vid_index])[0] @@ -602,217 +608,156 @@ def rule_based_video( ) corners = frame_corners(h, w) - if mode in ["show", "save"]: - - cap = cv2.VideoCapture(os.path.join(path, videos[vid_index])) - # Keep track of the frame number, to align with the tracking data - fnum = 0 - writer = None - frame_speeds = {_id: -np.inf for _id in animal_ids} if animal_ids else -np.inf + cap = cv2.VideoCapture(os.path.join(path, videos[vid_index])) + # Keep track of the frame number, to align with the tracking data + fnum = 0 + writer = None + frame_speeds = {_id: -np.inf for _id in animal_ids} if animal_ids else -np.inf - # Loop over the frames in the video - pbar = tqdm(total=min(coords.shape[0] - recog_limit, frame_limit)) - while cap.isOpened() and fnum < frame_limit: + # Loop over the frames in the video + pbar = tqdm(total=min(coords.shape[0] - recog_limit, frame_limit)) + while cap.isOpened() and fnum < frame_limit: - ret, frame = cap.read() - # if frame is read correctly ret is True - if not ret: # pragma: no cover - print("Can't receive frame (stream end?). Exiting ...") - break + ret, frame = cap.read() + # if frame is read correctly ret is True + if not ret: # pragma: no cover + print("Can't receive frame (stream end?). Exiting ...") + break - font = cv2.FONT_HERSHEY_COMPLEX_SMALL + font = cv2.FONT_HERSHEY_COMPLEX_SMALL - # Capture speeds - try: - if ( - list(frame_speeds.values())[0] == -np.inf - or fnum % hparams["speed_pause"] == 0 - ): - for _id in animal_ids: - frame_speeds[_id] = speeds[_id + "_Center"][fnum] - except AttributeError: - if frame_speeds == -np.inf or fnum % hparams["speed_pause"] == 0: - frame_speeds = speeds["Center"][fnum] - - # Display all annotations in the output video - if animal_ids: - if tag_dict["nose2nose"][fnum] and not tag_dict["sidebyside"][fnum]: - cv2.putText( - frame, - "Nose-Nose", - ( - corners["downleft"] - if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] - else corners["downright"] - ), - font, - 1, - (255, 255, 255), - 2, - ) + # Capture speeds + try: + if ( + list(frame_speeds.values())[0] == -np.inf + or fnum % hparams["speed_pause"] == 0 + ): + for _id in animal_ids: + frame_speeds[_id] = speeds[_id + "_Center"][fnum] + except AttributeError: + if frame_speeds == -np.inf or fnum % hparams["speed_pause"] == 0: + frame_speeds = speeds["Center"][fnum] + + # Display all annotations in the output video + + def write_on_frame(text, pos, col=(255, 255, 255)): + """Partial closure over cv2.putText to avoid code repetition""" + return cv2.putText(frame, text, pos, font, 1, col, 2) + + if len(animal_ids) > 1: + if tag_dict["nose2nose"][fnum] and not tag_dict["sidebyside"][fnum]: + write_on_frame( + "Nose-Nose", + ( + corners["downleft"] + if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] + else corners["downright"] + ), + ) + if ( + tag_dict[animal_ids[0] + "_nose2tail"][fnum] + and not tag_dict["sidereside"][fnum] + ): + write_on_frame("Nose-Tail", corners["downleft"]) + if ( + tag_dict[animal_ids[1] + "_nose2tail"][fnum] + and not tag_dict["sidereside"][fnum] + ): + write_on_frame("Nose-Tail", corners["downright"]) + if tag_dict["sidebyside"][fnum]: + write_on_frame( + "Side-side", + ( + corners["downleft"] + if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] + else corners["downright"] + ), + ) + if tag_dict["sidereside"][fnum]: + write_on_frame( + "Side-Rside", + ( + corners["downleft"] + if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] + else corners["downright"] + ), + ) + for _id, down_pos, up_pos in zip( + animal_ids, + [corners["downleft"], corners["downright"]], + [corners["upleft"], corners["upright"]], + ): + if tag_dict[_id + "_climbing"][fnum]: + write_on_frame("Climbing", down_pos) if ( - tag_dict[animal_ids[0] + "_nose2tail"][fnum] - and not tag_dict["sidereside"][fnum] + tag_dict[_id + "_huddle"][fnum] + and not tag_dict[_id + "_climbing"][fnum] ): - cv2.putText( - frame, - "Nose-Tail", - corners["downleft"], - font, - 1, - (255, 255, 255), - 2, - ) + write_on_frame("Huddling", down_pos) if ( - tag_dict[animal_ids[1] + "_nose2tail"][fnum] - and not tag_dict["sidereside"][fnum] + tag_dict[_id + "_following"][fnum] + and not tag_dict[_id + "_climbing"][fnum] ): - cv2.putText( - frame, - "Nose-Tail", - corners["downright"], - font, - 1, - (255, 255, 255), - 2, - ) - if tag_dict["sidebyside"][fnum]: - cv2.putText( - frame, - "Side-side", - ( - corners["downleft"] - if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] - else corners["downright"] - ), - font, - 1, - (255, 255, 255), - 2, - ) - if tag_dict["sidereside"][fnum]: - cv2.putText( - frame, - "Side-Rside", - ( - corners["downleft"] - if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] - else corners["downright"] - ), - font, - 1, - (255, 255, 255), - 2, - ) - for _id, down_pos, up_pos in zip( - animal_ids, - [corners["downleft"], corners["downright"]], - [corners["upleft"], corners["upright"]], - ): - if tag_dict[_id + "_climbing"][fnum]: - cv2.putText( - frame, "Climbing", down_pos, font, 1, (255, 255, 255), 2 - ) - if ( - tag_dict[_id + "_huddle"][fnum] - and not tag_dict[_id + "_climbing"][fnum] - ): - cv2.putText( - frame, "Huddling", down_pos, font, 1, (255, 255, 255), 2 - ) - if ( - tag_dict[_id + "_following"][fnum] - and not tag_dict[_id + "_climbing"][fnum] - ): - cv2.putText( - frame, - "*f", - (int(w * 0.3 / 10), int(h / 10)), - font, - 1, - ( - (150, 150, 255) - if frame_speeds[animal_ids[0]] - > frame_speeds[animal_ids[1]] - else (150, 255, 150) - ), - 2, - ) - cv2.putText( - frame, - _id + ": " + str(np.round(frame_speeds[_id], 2)) + " mmpf", - (up_pos[0] - 20, up_pos[1]), - font, - 1, + write_on_frame( + "*f", + (int(w * 0.3 / 10), int(h / 10)), ( (150, 150, 255) - if frame_speeds[_id] == max(list(frame_speeds.values())) + if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]] else (150, 255, 150) ), - 2, - ) - - else: - if tag_dict["climbing"][fnum]: - cv2.putText( - frame, - "Climbing", - corners["downleft"], - font, - 1, - (255, 255, 255), - 2, - ) - if tag_dict["huddle"][fnum] and not tag_dict["climbing"][fnum]: - cv2.putText( - frame, - "huddle", - corners["downleft"], - font, - 1, - (255, 255, 255), - 2, ) - cv2.putText( - frame, - str(np.round(frame_speeds, 2)) + " mmpf", - corners["upleft"], - font, - 1, + write_on_frame( + _id + ": " + str(np.round(frame_speeds[_id], 2)) + " mmpf", + (up_pos[0] - 20, up_pos[1]), ( (150, 150, 255) - if hparams["huddle_speed"] > frame_speeds + if frame_speeds[_id] == max(list(frame_speeds.values())) else (150, 255, 150) ), - 2, ) - if mode == "show": # pragma: no cover - cv2.imshow("frame", frame) + else: + if tag_dict["climbing"][fnum]: + write_on_frame("Climbing", corners["downleft"]) + if tag_dict["huddle"][fnum] and not tag_dict["climbing"][fnum]: + write_on_frame("huddle", corners["downleft"]) + write_on_frame( + str(np.round(frame_speeds, 2)) + " mmpf", + corners["upleft"], + ( + (150, 150, 255) + if hparams["huddle_speed"] > frame_speeds + else (150, 255, 150) + ), + ) - if cv2.waitKey(1) == ord("q"): - break + if mode == "show": # pragma: no cover + cv2.imshow("frame", frame) - if mode == "save": + if cv2.waitKey(1) == ord("q"): + break - if writer is None: - # Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file. - # Define the FPS. Also frame size is passed. - writer = cv2.VideoWriter() - writer.open( - re.findall("(.*?)_", tracks[vid_index])[0] + "_tagged.avi", - cv2.VideoWriter_fourcc(*"MJPG"), - (hparams["fps"] if hparams["fps"] != 0 else cv2.CAP_PROP_FPS), - (frame.shape[1], frame.shape[0]), - True, - ) + if mode == "save": + + if writer is None: + # Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file. + # Define the FPS. Also frame size is passed. + writer = cv2.VideoWriter() + writer.open( + re.findall("(.*?)_", tracks[vid_index])[0] + "_tagged.avi", + cv2.VideoWriter_fourcc(*"MJPG"), + (hparams["fps"] if hparams["fps"] != 0 else cv2.CAP_PROP_FPS), + (frame.shape[1], frame.shape[0]), + True, + ) - print(cv2.CAP_PROP_FPS) + print(cv2.CAP_PROP_FPS) - writer.write(frame) + writer.write(frame) - pbar.update(1) - fnum += 1 + pbar.update(1) + fnum += 1 - cap.release() - cv2.destroyAllWindows() + cap.release() + cv2.destroyAllWindows()