Commit 8d757e4c authored by lucas_miranda's avatar lucas_miranda
Browse files

Refactored pose_utils.py

parent 38fce63d
Pipeline #83177 failed with stage
in 23 minutes and 13 seconds
......@@ -590,8 +590,14 @@ def rule_based_video(
"""
# DATA OBTENTION AND PREPARATION
assert mode in [
"save",
"show",
], "Parameter 'mode' should be one of 'save' and 'show'. See docs for details"
animal_ids = coordinates._animal_ids
# undercond = "_" if len(animal_ids) > 1 else ""
undercond = "_" if len(animal_ids) > 1 else ""
vid_name = re.findall("(.*?)_", tracks[vid_index])[0]
......@@ -602,217 +608,156 @@ def rule_based_video(
)
corners = frame_corners(h, w)
if mode in ["show", "save"]:
cap = cv2.VideoCapture(os.path.join(path, videos[vid_index]))
# Keep track of the frame number, to align with the tracking data
fnum = 0
writer = None
frame_speeds = {_id: -np.inf for _id in animal_ids} if animal_ids else -np.inf
cap = cv2.VideoCapture(os.path.join(path, videos[vid_index]))
# Keep track of the frame number, to align with the tracking data
fnum = 0
writer = None
frame_speeds = {_id: -np.inf for _id in animal_ids} if animal_ids else -np.inf
# Loop over the frames in the video
pbar = tqdm(total=min(coords.shape[0] - recog_limit, frame_limit))
while cap.isOpened() and fnum < frame_limit:
# Loop over the frames in the video
pbar = tqdm(total=min(coords.shape[0] - recog_limit, frame_limit))
while cap.isOpened() and fnum < frame_limit:
ret, frame = cap.read()
# if frame is read correctly ret is True
if not ret: # pragma: no cover
print("Can't receive frame (stream end?). Exiting ...")
break
ret, frame = cap.read()
# if frame is read correctly ret is True
if not ret: # pragma: no cover
print("Can't receive frame (stream end?). Exiting ...")
break
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
# Capture speeds
try:
if (
list(frame_speeds.values())[0] == -np.inf
or fnum % hparams["speed_pause"] == 0
):
for _id in animal_ids:
frame_speeds[_id] = speeds[_id + "_Center"][fnum]
except AttributeError:
if frame_speeds == -np.inf or fnum % hparams["speed_pause"] == 0:
frame_speeds = speeds["Center"][fnum]
# Display all annotations in the output video
if animal_ids:
if tag_dict["nose2nose"][fnum] and not tag_dict["sidebyside"][fnum]:
cv2.putText(
frame,
"Nose-Nose",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
font,
1,
(255, 255, 255),
2,
)
# Capture speeds
try:
if (
list(frame_speeds.values())[0] == -np.inf
or fnum % hparams["speed_pause"] == 0
):
for _id in animal_ids:
frame_speeds[_id] = speeds[_id + "_Center"][fnum]
except AttributeError:
if frame_speeds == -np.inf or fnum % hparams["speed_pause"] == 0:
frame_speeds = speeds["Center"][fnum]
# Display all annotations in the output video
def write_on_frame(text, pos, col=(255, 255, 255)):
"""Partial closure over cv2.putText to avoid code repetition"""
return cv2.putText(frame, text, pos, font, 1, col, 2)
if len(animal_ids) > 1:
if tag_dict["nose2nose"][fnum] and not tag_dict["sidebyside"][fnum]:
write_on_frame(
"Nose-Nose",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
)
if (
tag_dict[animal_ids[0] + "_nose2tail"][fnum]
and not tag_dict["sidereside"][fnum]
):
write_on_frame("Nose-Tail", corners["downleft"])
if (
tag_dict[animal_ids[1] + "_nose2tail"][fnum]
and not tag_dict["sidereside"][fnum]
):
write_on_frame("Nose-Tail", corners["downright"])
if tag_dict["sidebyside"][fnum]:
write_on_frame(
"Side-side",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
)
if tag_dict["sidereside"][fnum]:
write_on_frame(
"Side-Rside",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
)
for _id, down_pos, up_pos in zip(
animal_ids,
[corners["downleft"], corners["downright"]],
[corners["upleft"], corners["upright"]],
):
if tag_dict[_id + "_climbing"][fnum]:
write_on_frame("Climbing", down_pos)
if (
tag_dict[animal_ids[0] + "_nose2tail"][fnum]
and not tag_dict["sidereside"][fnum]
tag_dict[_id + "_huddle"][fnum]
and not tag_dict[_id + "_climbing"][fnum]
):
cv2.putText(
frame,
"Nose-Tail",
corners["downleft"],
font,
1,
(255, 255, 255),
2,
)
write_on_frame("Huddling", down_pos)
if (
tag_dict[animal_ids[1] + "_nose2tail"][fnum]
and not tag_dict["sidereside"][fnum]
tag_dict[_id + "_following"][fnum]
and not tag_dict[_id + "_climbing"][fnum]
):
cv2.putText(
frame,
"Nose-Tail",
corners["downright"],
font,
1,
(255, 255, 255),
2,
)
if tag_dict["sidebyside"][fnum]:
cv2.putText(
frame,
"Side-side",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
font,
1,
(255, 255, 255),
2,
)
if tag_dict["sidereside"][fnum]:
cv2.putText(
frame,
"Side-Rside",
(
corners["downleft"]
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else corners["downright"]
),
font,
1,
(255, 255, 255),
2,
)
for _id, down_pos, up_pos in zip(
animal_ids,
[corners["downleft"], corners["downright"]],
[corners["upleft"], corners["upright"]],
):
if tag_dict[_id + "_climbing"][fnum]:
cv2.putText(
frame, "Climbing", down_pos, font, 1, (255, 255, 255), 2
)
if (
tag_dict[_id + "_huddle"][fnum]
and not tag_dict[_id + "_climbing"][fnum]
):
cv2.putText(
frame, "Huddling", down_pos, font, 1, (255, 255, 255), 2
)
if (
tag_dict[_id + "_following"][fnum]
and not tag_dict[_id + "_climbing"][fnum]
):
cv2.putText(
frame,
"*f",
(int(w * 0.3 / 10), int(h / 10)),
font,
1,
(
(150, 150, 255)
if frame_speeds[animal_ids[0]]
> frame_speeds[animal_ids[1]]
else (150, 255, 150)
),
2,
)
cv2.putText(
frame,
_id + ": " + str(np.round(frame_speeds[_id], 2)) + " mmpf",
(up_pos[0] - 20, up_pos[1]),
font,
1,
write_on_frame(
"*f",
(int(w * 0.3 / 10), int(h / 10)),
(
(150, 150, 255)
if frame_speeds[_id] == max(list(frame_speeds.values()))
if frame_speeds[animal_ids[0]] > frame_speeds[animal_ids[1]]
else (150, 255, 150)
),
2,
)
else:
if tag_dict["climbing"][fnum]:
cv2.putText(
frame,
"Climbing",
corners["downleft"],
font,
1,
(255, 255, 255),
2,
)
if tag_dict["huddle"][fnum] and not tag_dict["climbing"][fnum]:
cv2.putText(
frame,
"huddle",
corners["downleft"],
font,
1,
(255, 255, 255),
2,
)
cv2.putText(
frame,
str(np.round(frame_speeds, 2)) + " mmpf",
corners["upleft"],
font,
1,
write_on_frame(
_id + ": " + str(np.round(frame_speeds[_id], 2)) + " mmpf",
(up_pos[0] - 20, up_pos[1]),
(
(150, 150, 255)
if hparams["huddle_speed"] > frame_speeds
if frame_speeds[_id] == max(list(frame_speeds.values()))
else (150, 255, 150)
),
2,
)
if mode == "show": # pragma: no cover
cv2.imshow("frame", frame)
else:
if tag_dict["climbing"][fnum]:
write_on_frame("Climbing", corners["downleft"])
if tag_dict["huddle"][fnum] and not tag_dict["climbing"][fnum]:
write_on_frame("huddle", corners["downleft"])
write_on_frame(
str(np.round(frame_speeds, 2)) + " mmpf",
corners["upleft"],
(
(150, 150, 255)
if hparams["huddle_speed"] > frame_speeds
else (150, 255, 150)
),
)
if cv2.waitKey(1) == ord("q"):
break
if mode == "show": # pragma: no cover
cv2.imshow("frame", frame)
if mode == "save":
if cv2.waitKey(1) == ord("q"):
break
if writer is None:
# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
# Define the FPS. Also frame size is passed.
writer = cv2.VideoWriter()
writer.open(
re.findall("(.*?)_", tracks[vid_index])[0] + "_tagged.avi",
cv2.VideoWriter_fourcc(*"MJPG"),
(hparams["fps"] if hparams["fps"] != 0 else cv2.CAP_PROP_FPS),
(frame.shape[1], frame.shape[0]),
True,
)
if mode == "save":
if writer is None:
# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
# Define the FPS. Also frame size is passed.
writer = cv2.VideoWriter()
writer.open(
re.findall("(.*?)_", tracks[vid_index])[0] + "_tagged.avi",
cv2.VideoWriter_fourcc(*"MJPG"),
(hparams["fps"] if hparams["fps"] != 0 else cv2.CAP_PROP_FPS),
(frame.shape[1], frame.shape[0]),
True,
)
print(cv2.CAP_PROP_FPS)
print(cv2.CAP_PROP_FPS)
writer.write(frame)
writer.write(frame)
pbar.update(1)
fnum += 1
pbar.update(1)
fnum += 1
cap.release()
cv2.destroyAllWindows()
cap.release()
cv2.destroyAllWindows()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment