setayesh / app1.py
digitalai's picture
Rename app.py to app1.py
909b2b0
import copy
from abc import ABC
from multiprocessing import Queue, Process
from typing import NamedTuple, List
import streamlit as st
from streamlit_webrtc import VideoProcessorBase, webrtc_streamer, WebRtcMode, ClientSettings
import av
import cv2
import numpy as np
import mediapipe as mp
from utils import CvFpsCalc
from main import draw_landmarks, draw_stick_figure
from fake_objects import FakeResultObject, FakeLandmarksObject, FakeLandmarkObject
from turn import get_ice_servers
_SENTINEL_ = "_SENTINEL_"
def pose_process(
in_queue: Queue,
out_queue: Queue,
static_image_mode,
model_complexity,
min_detection_confidence,
min_tracking_confidence,
):
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
static_image_mode=static_image_mode,
model_complexity=model_complexity,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
)
while True:
input_item = in_queue.get(timeout=10)
if isinstance(input_item, type(_SENTINEL_)) and input_item == _SENTINEL_:
break
results = pose.process(input_item)
picklable_results = FakeResultObject(pose_landmarks=FakeLandmarksObject(landmark=[
FakeLandmarkObject(
x=pose_landmark.x,
y=pose_landmark.y,
z=pose_landmark.z,
visibility=pose_landmark.visibility,
) for pose_landmark in results.pose_landmarks.landmark
]))
out_queue.put_nowait(picklable_results)
class Sehat(VideoProcessorBase, ABC):
def __init__(self, static_image_mode,
model_complexity,
min_detection_confidence,
min_tracking_confidence,
rev_color,
display_mode,
show_fps) -> None:
self._in_queue = Queue()
self._out_queue = Queue()
self._pose_process = Process(target=pose_process, kwargs={
"in_queue": self._in_queue,
"out_queue": self._out_queue,
"static_image_mode": static_image_mode,
"model_complexity": model_complexity,
"min_detection_confidence": min_detection_confidence,
"min_tracking_confidence": min_tracking_confidence,
})
self._cvFpsCalc = CvFpsCalc(buffer_len=10)
self.rev_color = rev_color
self.display_mode = display_mode
self.show_fps = show_fps
self._pose_process.start()
def _infer_pose(self, image):
self._in_queue.put_nowait(image)
return self._out_queue.get(timeout=10)
def _stop_pose_process(self):
self._in_queue.put_nowait(_SENTINEL_)
self._pose_process.join(timeout=10)
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
display_fps = self._cvFpsCalc.get()
if self.rev_color:
color = (255, 255, 255)
bg_color = (100, 33, 3)
else:
color = (100, 33, 3)
bg_color = (255, 255, 255)
image = frame.to_ndarray(format="bgr24")
image = cv2.flip(image, 1)
debug_image01 = copy.deepcopy(image)
debug_image02 = np.zeros((image.shape[0], image.shape[1], 3), np.uint8)
cv2.rectangle(debug_image02, (0, 0), (image.shape[1], image.shape[0]),
bg_color,
thickness=-1)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self._infer_pose(image)
# results = self._pose.process(image)
if results.pose_landmarks is not None:
debug_image01 = draw_landmarks(
debug_image01,
results.pose_landmarks,
)
debug_image02 = draw_stick_figure(
debug_image02,
results.pose_landmarks,
color=color,
bg_color=bg_color,
)
if self.show_fps:
cv2.putText(debug_image01, "FPS:" + str(display_fps), (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2, cv2.LINE_AA)
cv2.putText(debug_image02, "FPS:" + str(display_fps), (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2, cv2.LINE_AA)
if self.display_mode == "نمای حقیقی":
return av.VideoFrame.from_ndarray(debug_image01, format="bgr24")
elif self.display_mode == "الگوی نمایشی":
return av.VideoFrame.from_ndarray(debug_image02, format="bgr24")
elif self.display_mode == "نمای حقیقی و الگوی نمایشی":
new_image = np.zeros(image.shape, dtype=np.uint8)
h, w = image.shape[0:2]
half_h = h // 2
half_w = w // 2
offset_y = h // 4
new_image[offset_y: offset_y + half_h, 0: half_w, :] = cv2.resize(debug_image02, (half_w, half_h))
new_image[offset_y: offset_y + half_h, half_w:, :] = cv2.resize(debug_image01, (half_w, half_h))
return av.VideoFrame.from_ndarray(new_image, format="bgr24")
def __del__(self):
self._stop_pose_process()
def main():
# with st.expander("Model parameters (there parameters are effective only at initialization)"):
# static_image_mode = st.checkbox("Static image mode")
# model_complexity = st.radio("Model complexity", [0, 1, 2], index=0)
# min_detection_confidence = st.slider("Min detection confidence", min_value=0.0, max_value=1.0, value=0.5, step=0.01)
# min_tracking_confidence = st.slider("Min tracking confidence", min_value=0.0, max_value=1.0, value=0.5, step=0.01)
static_image_mode = False
model_complexity = 0
min_detection_confidence = 0.5
min_tracking_confidence = 0.5
display_mode = st.radio("حالت نمایش", ["الگوی نمایشی", "نمای حقیقی", "نمای حقیقی و الگوی نمایشی"], index=0)
show_fps = st.checkbox("نمایش سرعت تصویر", value=True)
def processor_factory():
return Sehat(
static_image_mode=static_image_mode,
model_complexity=model_complexity,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
rev_color=False,
display_mode=display_mode,
show_fps=show_fps
)
webrtc_ctx = webrtc_streamer(
key="الگوی نمایشی",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
media_stream_constraints={"video": True, "audio": False},
video_processor_factory=processor_factory,
)
st.session_state["شروع"] = webrtc_ctx.state.playing
if webrtc_ctx.video_processor:
webrtc_ctx.video_processor.rev_color = False
webrtc_ctx.video_processor.display_mode = display_mode
webrtc_ctx.video_processor.show_fps = show_fps
if __name__ == "__main__":
main()