Spaces:
Sleeping
Sleeping
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| import threading | |
| mp_drawing = mp.solutions.drawing_utils | |
| mp_pose = mp.solutions.pose | |
| import gradio as gr | |
| def detect_pose(im): | |
| with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: | |
| # detect pose landmarks and render them on the image | |
| # convert the image from BGR to RGB (opneCV uses BGR by default) | |
| image = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| # make detection from pose instance | |
| results = pose.process(image) | |
| # print(results.pose_landmarks) | |
| image.flags.writeable = True | |
| image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| #Extracting landmarks | |
| # try: | |
| # landmarks = results.pose_landmarks.landmark | |
| # print(landmarks) | |
| # except: | |
| # pass | |
| # print(results) | |
| # render pose landmarks on the image | |
| mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, | |
| mp_drawing.DrawingSpec(color=(51,255,51), thickness=2, circle_radius=2), | |
| mp_drawing.DrawingSpec(color=(51,153,255), thickness=2, circle_radius=2)) | |
| # cv2.imshow('MediaPipe Pose', frame) | |
| # cv2.imshow('MediaPipe Pose', image) | |
| return image | |
| pose_results = None | |
| def update_pose_results(im): | |
| global pose_results | |
| pose_results = detect_pose(im) | |
| with gr.Blocks() as app: | |
| gr.Markdown("# Webcam Testing") | |
| with gr.Row(): | |
| inp = gr.Image(source="webcam", streaming=True) | |
| out = gr.Image() | |
| # Start a separate thread for pose detection | |
| def pose_detection_thread(): | |
| while True: | |
| update_pose_results(inp.value) | |
| pose_thread = threading.Thread(target=pose_detection_thread) | |
| pose_thread.daemon = True # Allow the thread to exit when the main program exits | |
| pose_thread.start() | |
| # Continuously update the output with pose results | |
| def update_output(): | |
| global pose_results | |
| while True: | |
| if pose_results is not None: | |
| out.value = pose_results | |
| output_thread = threading.Thread(target=update_output) | |
| output_thread.daemon = True | |
| output_thread.start() | |
| app.launch(debug=True) |