Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import cv2 | |
| import numpy as np | |
| import tensorflow as tf | |
| from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel | |
| import torch | |
| import os | |
| import requests | |
| from tempfile import NamedTemporaryFile | |
| import gc | |
| import tensorflow_hub as hub | |
| import logging | |
| from PIL import Image | |
| # Configure logging | |
| logging.basicConfig(level=logging.ERROR) | |
| # Ensure that Hugging Face uses the appropriate cache directory | |
| os.environ['TRANSFORMERS_CACHE'] = '/app/cache' | |
| os.environ['HF_HOME'] = '/app/cache' | |
| movenet_model_path = '/models/movenet/movenet_lightning' | |
| # Keypoint dictionary for reference | |
| KEYPOINT_DICT = { | |
| 'nose': 0, | |
| 'left_eye': 1, | |
| 'right_eye': 2, | |
| 'left_ear': 3, | |
| 'right_ear': 4, | |
| 'left_shoulder': 5, | |
| 'right_shoulder': 6, | |
| 'left_elbow': 7, | |
| 'right_elbow': 8, | |
| 'left_wrist': 9, | |
| 'right_wrist': 10, | |
| 'left_hip': 11, | |
| 'right_hip': 12, | |
| 'left_knee': 13, | |
| 'right_knee': 14, | |
| 'left_ankle': 15, | |
| 'right_ankle': 16 | |
| } | |
| app = Flask(__name__) | |
| def process_video(): | |
| try: | |
| # Clear previous cache | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| # Get the video URL from the request | |
| video_url = request.json.get('videoURL') | |
| height = request.json.get('height') | |
| weight = request.json.get('weight') | |
| wingspan = request.json.get('wingspan') | |
| if not video_url: | |
| return jsonify({"error": "No video URL provided"}), 400 | |
| if not all([height, weight, wingspan]): | |
| return jsonify({"error": "Height, weight, and wingspan are required"}), 400 | |
| # Download the video from the S3 URL | |
| with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: | |
| response = requests.get(video_url) | |
| if response.status_code != 200: | |
| return jsonify({"error": "Failed to download video from the provided URL"}), 400 | |
| temp_video_file.write(response.content) | |
| video_path = temp_video_file.name | |
| # Open the video file | |
| cap = cv2.VideoCapture(video_path) | |
| frames = [] | |
| # Extract 60 frames from the video | |
| success, frame = cap.read() | |
| frame_count = 0 | |
| while success and frame_count < 60: | |
| frames.append(frame) | |
| success, frame = cap.read() | |
| frame_count += 1 | |
| cap.release() | |
| os.remove(video_path) | |
| # Check if the model path exists and load MoveNet model | |
| if not os.path.exists(movenet_model_path): | |
| # Download the model from TensorFlow Hub | |
| movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") | |
| else: | |
| movenet_model = tf.saved_model.load(movenet_model_path) | |
| # Process each frame with MoveNet (to get 3D keypoints and detect stance) | |
| movenet_results = [] | |
| stances = [] | |
| guard_up = [] | |
| for frame_index, frame in enumerate(frames): | |
| input_tensor = tf.image.resize_with_pad(tf.convert_to_tensor(frame, dtype=tf.uint8), 256, 256) | |
| input_tensor = tf.cast(input_tensor, dtype=tf.int32) # Cast to int32 instead of float32 | |
| input_tensor = tf.expand_dims(input_tensor, axis=0) | |
| keypoints = movenet_model.signatures['serving_default'](input_tensor) | |
| keypoints_3d = keypoints['output_0'][0].numpy().tolist() # Assuming the model returns 3D keypoints | |
| movenet_results.append(keypoints_3d) | |
| # Detect stance based on keypoints (using ankles and wrists) | |
| left_ankle = keypoints_3d[KEYPOINT_DICT['left_ankle']] | |
| right_ankle = keypoints_3d[KEYPOINT_DICT['right_ankle']] | |
| left_wrist = keypoints_3d[KEYPOINT_DICT['left_wrist']] | |
| right_wrist = keypoints_3d[KEYPOINT_DICT['right_wrist']] | |
| if right_ankle[0] < left_ankle[0] and right_wrist[0] < left_wrist[0]: | |
| stance = "orthodox" | |
| elif left_ankle[0] < right_ankle[0] and left_wrist[0] < right_wrist[0]: | |
| stance = "southpaw" | |
| else: | |
| stance = "unknown" | |
| stances.append(stance) | |
| # Detect if guard is up (both hands near eye level at the side of the head) | |
| nose = keypoints_3d[KEYPOINT_DICT['nose']] | |
| guard_threshold = 0.1 # Threshold distance to consider hands near the head | |
| left_hand_near_head = abs(left_wrist[1] - nose[1]) < guard_threshold | |
| right_hand_near_head = abs(right_wrist[1] - nose[1]) < guard_threshold | |
| guard_up.append(left_hand_near_head and right_hand_near_head) | |
| # Free up memory used by MoveNet | |
| del movenet_model | |
| gc.collect() | |
| # Generate captions for all 60 frames using BLIP | |
| captions = [] | |
| blip_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-base').to('cuda' if torch.cuda.is_available() else 'cpu') | |
| blip_processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-base') | |
| for frame in frames: | |
| frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image | |
| inputs = blip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
| with torch.no_grad(): | |
| caption = blip_model.generate(**inputs) | |
| captions.append(blip_processor.decode(caption[0], skip_special_tokens=True)) | |
| # Free up memory used by BLIP | |
| del blip_model, blip_processor | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Use CLIP to assess the similarity of frames to a Muay Thai jab prompt, including stance | |
| clip_results = [] | |
| clip_model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32').to('cuda' if torch.cuda.is_available() else 'cpu') | |
| clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32') | |
| for i, frame in enumerate(frames): | |
| frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image | |
| stance = stances[i] | |
| prompt = f"A person performing a Muay Thai jab in {stance} stance at {height} in in height, {weight} lbs in weight, and a wingspan of {wingspan} cm." | |
| text_inputs = clip_processor(text=[prompt], return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
| image_inputs = clip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
| with torch.no_grad(): | |
| image_features = clip_model.get_image_features(**image_inputs) | |
| text_features = clip_model.get_text_features(**text_inputs) | |
| similarity = torch.nn.functional.cosine_similarity(image_features, text_features) | |
| clip_results.append(similarity.item()) | |
| # Free up memory used by CLIP | |
| del clip_model, clip_processor | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Calculate score based on CLIP results and BLIP captions | |
| avg_clip_similarity = sum(clip_results) / len(clip_results) if clip_results else 0 | |
| guard_score = sum(guard_up) / len(guard_up) if guard_up else 0 | |
| overall_score = (avg_clip_similarity + guard_score) / 2 | |
| # Scale the overall score to a range of 0 - 10 | |
| overall_score = max(0, min(overall_score * 10, 10)) | |
| # Return combined results | |
| response = { | |
| "movenet_results": movenet_results, | |
| "blip_captions": captions, | |
| "clip_similarities": clip_results, | |
| "stances": stances, | |
| "overall_score": overall_score, | |
| "guard_score": guard_score | |
| } | |
| return jsonify(response) | |
| except Exception as e: | |
| logging.error(str(e)) | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| # Clear any cache before starting the Flask server | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| # Start the Flask app | |
| app.run(host='0.0.0.0', port=7860) | |