Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, redirect, url_for, render_template, session, Response | |
| import firebase_admin | |
| from firebase_admin import credentials, auth, db | |
| # from workouts.bicepCurls import bicepcurls | |
| import os | |
| import time | |
| import mediapipe as mp | |
| import cv2 | |
| import numpy as np | |
| today_date = time.strftime("%Y-%m-%d") | |
| mp_pose = mp.solutions.pose | |
| mp_drawing = mp.solutions.drawing_utils | |
| curr_track = {} | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| app.secret_key = '9177' | |
| # Firebase Admin SDK Initialization | |
| cred = credentials.Certificate("serviceAccountKey.json") | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'https://fitnessdb-c9b11-default-rtdb.firebaseio.com/' | |
| }) | |
| def calculate_angle(a, b, c): | |
| a = np.array(a) # First point | |
| b = np.array(b) # Middle point | |
| c = np.array(c) # Last point | |
| radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0]) | |
| angle = np.abs(radians * 180.0 / np.pi) | |
| if angle > 180.0: | |
| angle = 360 - angle | |
| return angle | |
| stop_processing = False # Global flag to control video stream | |
| def is_shoulder_press_correct(landmarks, mp_pose): | |
| # Get coordinates of shoulder, elbow, and wrist (left arm as example) | |
| shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y] | |
| elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y] | |
| wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y] | |
| # Calculate angle at the elbow (shoulder, elbow, wrist) | |
| elbow_angle = calculate_angle(shoulder, elbow, wrist) | |
| # Check if the motion is vertical (wrist higher than elbow) | |
| if wrist[1] < elbow[1] and elbow[1] < shoulder[1]: | |
| # Ensure proper angle range for a shoulder press | |
| if 160 <= elbow_angle <= 180: | |
| return "Shoulder Press: Correct", True | |
| else: | |
| return "Shoulder Press: Incorrect - Elbow angle", False | |
| else: | |
| return "Shoulder Press: Incorrect - Alignment", False | |
| # Video processing function | |
| def shoulder_press_count(video_path): | |
| global curr_track | |
| cap = cv2.VideoCapture(video_path) | |
| count = 0 | |
| rep_started = False | |
| with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Convert the frame to RGB | |
| image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| # Process the image for pose detection | |
| results = pose.process(image) | |
| # Convert back to BGR for rendering | |
| image.flags.writeable = True | |
| image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| # Extract landmarks | |
| if results.pose_landmarks: | |
| landmarks = results.pose_landmarks.landmark | |
| # Check shoulder press posture | |
| feedback, is_correct = is_shoulder_press_correct(landmarks, mp_pose) | |
| # Count reps | |
| if is_correct and not rep_started: | |
| rep_started = True | |
| elif not is_correct and rep_started: | |
| rep_started = False | |
| count += 1 | |
| # Display feedback | |
| cv2.putText(image, feedback, (50, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if is_correct else (0, 0, 255), 2, cv2.LINE_AA) | |
| # Display rep count | |
| cv2.putText(image, f'Reps: {count}', (50, 100), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) | |
| # Draw landmarks | |
| mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) | |
| else: | |
| # Warn if no landmarks are detected | |
| cv2.putText(image, "No body detected", (50, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) | |
| # Encode frame for streaming | |
| ret, buffer = cv2.imencode('.jpg', image) | |
| frame = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
| cap.release() | |
| curr_track['shoulderpress'] = count | |
| print("shoulder_press count :",count) | |
| def process_bicep_curls(video_path): | |
| global curr_track | |
| global stop_processing # Access the global flag to stop the stream | |
| cap = cv2.VideoCapture(video_path) | |
| count = 0 | |
| movement_dir = 0 | |
| # Initialize MediaPipe Pose | |
| with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: | |
| while cap.isOpened(): | |
| if stop_processing: # If stop_processing flag is set, break the loop | |
| break | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame = cv2.resize(frame, (1280, 720)) | |
| # Convert the frame to RGB | |
| image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| # Process the image for pose detection | |
| results = pose.process(image) | |
| # Convert back to BGR for rendering | |
| image.flags.writeable = True | |
| image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| if results.pose_landmarks: | |
| landmarks = results.pose_landmarks.landmark | |
| # Extract relevant joints | |
| shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y] | |
| elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y] | |
| wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, | |
| landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y] | |
| # Calculate elbow angle | |
| elbow_angle = calculate_angle(shoulder, elbow, wrist) | |
| # Determine movement direction and count reps | |
| progress_percentage = np.interp(elbow_angle, (50, 160), (0, 100)) | |
| progress_bar = np.interp(elbow_angle, (50, 160), (650, 100)) | |
| color = (255, 0, 255) | |
| if progress_percentage == 100: | |
| color = (0, 255, 0) | |
| if movement_dir == 0: | |
| count += 0.5 | |
| movement_dir = 1 | |
| if progress_percentage == 0: | |
| color = (0, 0, 255) | |
| if movement_dir == 1: | |
| count += 0.5 | |
| movement_dir = 0 | |
| # Draw Progress Bar | |
| cv2.rectangle(frame, (1100, 100), (1175, 650), color, 3) | |
| cv2.rectangle(frame, (1100, int(progress_bar)), (1175, 650), color, cv2.FILLED) | |
| cv2.putText(frame, f'{int(progress_percentage)}%', (1100, 75), cv2.FONT_HERSHEY_PLAIN, 4, color, 4) | |
| # Draw Counter | |
| cv2.rectangle(frame, (0, 450), (250, 720), (0, 255, 0), cv2.FILLED) | |
| cv2.putText(frame, str(int(count)), (45, 670), cv2.FONT_HERSHEY_PLAIN, 15, (255, 0, 0), 30) | |
| # Draw landmarks | |
| mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) | |
| # Draw Finish Button on the video | |
| cv2.rectangle(frame, (10, 10), (150, 60), (0, 0, 255), -1) # Red button | |
| cv2.putText(frame, "FINISH", (20, 45), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | |
| # Encode the frame for streaming | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| frame = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
| cap.release() | |
| curr_track['bicepcurls'] = count | |
| # Flask route to stop the video feed | |
| def stop_stream(): | |
| global stop_processing | |
| stop_processing = True # Stop the video stream when this route is hit | |
| return "Stream stopped", 200 | |
| def index(): | |
| return render_template('index.html') | |
| # Login Route | |
| def register(): | |
| data = request.get_json() | |
| session['progress'] = {} | |
| session['recommended_plan'] = "" | |
| # Extract user details from the request | |
| name = data['name'] | |
| email = data['email'] | |
| password = data['password'] | |
| age = data['age'] | |
| height = data['height'] | |
| weight = data['weight'] | |
| max_pullups = data['maxPullups'] | |
| min_pullups = data['minPullups'] | |
| gender = data['gender'] | |
| pushups = data.get('pushups',[]) | |
| squats = data.get('squats',[]) | |
| pullups = data.get('pullups',[]) | |
| shoulderpress = data.get('shoulderpress',[]) | |
| bicepcurls = data.get('bicepcurls',[]) | |
| track = { | |
| "pushups": pushups, | |
| "squats": squats, | |
| "pullups": pullups, | |
| 'shoulder_press' : shoulderpress, | |
| 'bicepcurls':bicepcurls, | |
| 'recommended_plan':"" | |
| } | |
| session['progress'] = track | |
| # bicepcurls = data.get() | |
| # Create Firebase user using Firebase Auth | |
| try: | |
| # Create the user in Firebase Authentication | |
| user = auth.create_user( | |
| email=email, | |
| password=password, | |
| display_name=name | |
| ) | |
| # Store additional user data in Firebase Realtime Database | |
| user_ref = db.reference(f'users/{user.uid}') | |
| user_ref.set({ | |
| 'name': name, | |
| 'email': email, | |
| 'age': age, | |
| 'height': height, | |
| 'weight': weight, | |
| 'maxPullups': max_pullups, | |
| 'minPullups': min_pullups, | |
| 'gender': gender, | |
| 'progress':track | |
| }) | |
| return jsonify({"message": "User registered successfully!"}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 400 | |
| def login(): | |
| data = request.get_json() | |
| email = data['email'] | |
| password = data['password'] | |
| try: | |
| # Authenticate user with Firebase Auth | |
| user = auth.get_user_by_email(email) | |
| # Validate password (Firebase Authentication handles password validation) | |
| # No need to manually validate password here since Firebase Authentication ensures the password matches | |
| ref = db.reference("users") | |
| users_data = ref.get() | |
| for user_id,user_data in users_data.items(): | |
| if user_data.get('email') == email: | |
| user_name = user_data['name'] | |
| session['user_details'] = user_data | |
| session['progress'] = user_data['progress'] | |
| # session['progress'] = user_data.get('progress',{}) | |
| print(user_data['name']) | |
| return jsonify({"message":"sucess"}) | |
| except Exception as e: | |
| return jsonify({"error": "Invalid credentials"}), 401 | |
| # Home Route (Display User Credentials) | |
| def home(): | |
| # Get the ID token from the query parameter | |
| try : | |
| progress = session['progress'] | |
| except: | |
| progress = {} | |
| print(progress) | |
| sum = 0 | |
| length = 0 | |
| try: | |
| sum += progress['pushups'][-1] | |
| except: | |
| sum += 0 | |
| # progress = {"pushups":12} | |
| session[today_date] = {"bicepcurls":0,"squats":0,"pushups":0,"shoulderpress":0,"pullups":0} | |
| available_plans =[ | |
| {"name": "fitness maintenance", "description": "Build strength and muscle mass."}, | |
| {"name": "weight loss", "description": "Lose weight through healthy diet and exercise."}, | |
| ] | |
| # Process plan names in Python | |
| for plan in available_plans: | |
| # Use dictionary key 'name' instead of accessing as an attribute | |
| plan['name'] = plan['name'].replace(' ', '-').lower() | |
| # Define a recommended plan | |
| # Now pass the modified data to the template | |
| print(sum) | |
| if sum == 0: | |
| height = int(session['user_details']['height']) | |
| weight = int(session['user_details']['weight']) | |
| bmi = weight / (height/100)**2 | |
| recomm = "" | |
| if bmi < 18.5: | |
| recomm = "weight gain" | |
| available_plans.pop(2) | |
| elif 18.5 <= bmi <=24.9: | |
| recomm = "fitness maintenance" | |
| available_plans.pop(0) | |
| else: | |
| recomm = "weight loss" | |
| available_plans.pop(1) | |
| session['fitness_plan'] = recomm | |
| # Recommended workout code goes here | |
| return render_template('workout.html', recommended_plan=recomm, available_plans=available_plans) | |
| else: | |
| return render_template("currentprogress.html",sum=(sum/3) * 100,recommended_plan=progress["recommended_plan"]) | |
| def upload_video(exercise_name): | |
| if exercise_name == 'bicepcurls': | |
| if 'video' not in request.files: | |
| return "No video file uploaded", 400 | |
| video = request.files['video'] | |
| if video.filename == '': | |
| return "No selected file", 400 | |
| # Save the uploaded video with a unique name | |
| video_filename = f"{exercise_name}_{video.filename}" | |
| video_path = os.path.join("uploads", video_filename) | |
| video.save(video_path) | |
| return Response(process_bicep_curls(video_path), mimetype="multipart/x-mixed-replace; boundary=frame") | |
| elif exercise_name == "shoulderpress": | |
| if 'video' not in request.files: | |
| return "No video file uploaded", 400 | |
| video = request.files['video'] | |
| if video.filename == '': | |
| return "No selected file", 400 | |
| # Save the uploaded video with a unique name | |
| video_filename = f"{exercise_name}_{video.filename}" | |
| video_path = os.path.join("uploads", video_filename) | |
| video.save(video_path) | |
| return Response(shoulder_press_count(video_path), mimetype="multipart/x-mixed-replace; boundary=frame") | |
| def diet_data(): | |
| # Assuming the JSON file is stored in the static folder | |
| try: | |
| with open(os.path.join(app.static_folder, 'diet.json'), 'r') as f: | |
| diet_data = f.read() | |
| return diet_data | |
| except FileNotFoundError: | |
| return jsonify({"error": "Diet data not found."}), 404 | |
| def diet(): | |
| return render_template("diet.html") | |
| from feed_back_llm import update_feed_back | |
| def daily_report(): | |
| global curr_track | |
| print(curr_track) | |
| planned_reps = 8 | |
| planned_resttime = 1 | |
| counted_restime = 2 | |
| update_feed_back() | |
| return render_template("feed_back.html") | |
| def weight_gain(): | |
| workouts = [ | |
| {"name":"bicepcurls", "reps":12}, | |
| {"name":"shoulderpress", "reps":10}, | |
| {"name":"pullups", "reps":10}, | |
| {"name":"squats", "reps":15}, | |
| ] | |
| return render_template("workouts_List.html",exercises=workouts) | |
| def weight_loss(): | |
| workouts = [ | |
| {"name":"bicepcurls", "reps":12}, | |
| {"name":"shoulderpress", "reps":10}, | |
| {"name":"pullups", "reps":10}, | |
| {"name":"squats", "reps":15}, | |
| ] | |
| return render_template("workouts_List.html",exercises=workouts) | |
| def progress(): | |
| return render_template("currentprogress.html",sum=70) | |
| def fitness_maintenance(): | |
| workouts = [ | |
| {"name":"bicepcurls", "reps":12}, | |
| {"name":"shoulderpress", "reps":10}, | |
| {"name":"pullups", "reps":10}, | |
| {"name":"squats", "reps":15}, | |
| ] | |
| return render_template("workouts_List.html",exercises=workouts) | |
| if __name__ == '__main__': | |
| app.run(debug=True) | |