from flask import Flask, request, jsonify, redirect, url_for, render_template, session, Response import firebase_admin from firebase_admin import credentials, auth, db # from workouts.bicepCurls import bicepcurls import os import time import mediapipe as mp import cv2 import numpy as np today_date = time.strftime("%Y-%m-%d") mp_pose = mp.solutions.pose mp_drawing = mp.solutions.drawing_utils curr_track = {} # Initialize Flask app app = Flask(__name__) app.secret_key = '9177' # Firebase Admin SDK Initialization cred = credentials.Certificate("serviceAccountKey.json") firebase_admin.initialize_app(cred, { 'databaseURL': 'https://fitnessdb-c9b11-default-rtdb.firebaseio.com/' }) def calculate_angle(a, b, c): a = np.array(a) # First point b = np.array(b) # Middle point c = np.array(c) # Last point radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0]) angle = np.abs(radians * 180.0 / np.pi) if angle > 180.0: angle = 360 - angle return angle stop_processing = False # Global flag to control video stream def is_shoulder_press_correct(landmarks, mp_pose): # Get coordinates of shoulder, elbow, and wrist (left arm as example) shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y] elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y] wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y] # Calculate angle at the elbow (shoulder, elbow, wrist) elbow_angle = calculate_angle(shoulder, elbow, wrist) # Check if the motion is vertical (wrist higher than elbow) if wrist[1] < elbow[1] and elbow[1] < shoulder[1]: # Ensure proper angle range for a shoulder press if 160 <= elbow_angle <= 180: return "Shoulder Press: Correct", True else: return "Shoulder Press: Incorrect - Elbow angle", False else: return "Shoulder Press: Incorrect - Alignment", False # Video processing function def shoulder_press_count(video_path): global curr_track cap = cv2.VideoCapture(video_path) count = 0 rep_started = False with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: while cap.isOpened(): ret, frame = cap.read() if not ret: break # Convert the frame to RGB image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image.flags.writeable = False # Process the image for pose detection results = pose.process(image) # Convert back to BGR for rendering image.flags.writeable = True image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # Extract landmarks if results.pose_landmarks: landmarks = results.pose_landmarks.landmark # Check shoulder press posture feedback, is_correct = is_shoulder_press_correct(landmarks, mp_pose) # Count reps if is_correct and not rep_started: rep_started = True elif not is_correct and rep_started: rep_started = False count += 1 # Display feedback cv2.putText(image, feedback, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if is_correct else (0, 0, 255), 2, cv2.LINE_AA) # Display rep count cv2.putText(image, f'Reps: {count}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) # Draw landmarks mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) else: # Warn if no landmarks are detected cv2.putText(image, "No body detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) # Encode frame for streaming ret, buffer = cv2.imencode('.jpg', image) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') cap.release() curr_track['shoulderpress'] = count print("shoulder_press count :",count) def process_bicep_curls(video_path): global curr_track global stop_processing # Access the global flag to stop the stream cap = cv2.VideoCapture(video_path) count = 0 movement_dir = 0 # Initialize MediaPipe Pose with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: while cap.isOpened(): if stop_processing: # If stop_processing flag is set, break the loop break ret, frame = cap.read() if not ret: break frame = cv2.resize(frame, (1280, 720)) # Convert the frame to RGB image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image.flags.writeable = False # Process the image for pose detection results = pose.process(image) # Convert back to BGR for rendering image.flags.writeable = True image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if results.pose_landmarks: landmarks = results.pose_landmarks.landmark # Extract relevant joints shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y] elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y] wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y] # Calculate elbow angle elbow_angle = calculate_angle(shoulder, elbow, wrist) # Determine movement direction and count reps progress_percentage = np.interp(elbow_angle, (50, 160), (0, 100)) progress_bar = np.interp(elbow_angle, (50, 160), (650, 100)) color = (255, 0, 255) if progress_percentage == 100: color = (0, 255, 0) if movement_dir == 0: count += 0.5 movement_dir = 1 if progress_percentage == 0: color = (0, 0, 255) if movement_dir == 1: count += 0.5 movement_dir = 0 # Draw Progress Bar cv2.rectangle(frame, (1100, 100), (1175, 650), color, 3) cv2.rectangle(frame, (1100, int(progress_bar)), (1175, 650), color, cv2.FILLED) cv2.putText(frame, f'{int(progress_percentage)}%', (1100, 75), cv2.FONT_HERSHEY_PLAIN, 4, color, 4) # Draw Counter cv2.rectangle(frame, (0, 450), (250, 720), (0, 255, 0), cv2.FILLED) cv2.putText(frame, str(int(count)), (45, 670), cv2.FONT_HERSHEY_PLAIN, 15, (255, 0, 0), 30) # Draw landmarks mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) # Draw Finish Button on the video cv2.rectangle(frame, (10, 10), (150, 60), (0, 0, 255), -1) # Red button cv2.putText(frame, "FINISH", (20, 45), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) # Encode the frame for streaming ret, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') cap.release() curr_track['bicepcurls'] = count # Flask route to stop the video feed @app.route('/stop_stream', methods=['POST']) def stop_stream(): global stop_processing stop_processing = True # Stop the video stream when this route is hit return "Stream stopped", 200 @app.route('/') def index(): return render_template('index.html') # Login Route @app.route('/register', methods=['POST']) def register(): data = request.get_json() session['progress'] = {} session['recommended_plan'] = "" # Extract user details from the request name = data['name'] email = data['email'] password = data['password'] age = data['age'] height = data['height'] weight = data['weight'] max_pullups = data['maxPullups'] min_pullups = data['minPullups'] gender = data['gender'] pushups = data.get('pushups',[]) squats = data.get('squats',[]) pullups = data.get('pullups',[]) shoulderpress = data.get('shoulderpress',[]) bicepcurls = data.get('bicepcurls',[]) track = { "pushups": pushups, "squats": squats, "pullups": pullups, 'shoulder_press' : shoulderpress, 'bicepcurls':bicepcurls, 'recommended_plan':"" } session['progress'] = track # bicepcurls = data.get() # Create Firebase user using Firebase Auth try: # Create the user in Firebase Authentication user = auth.create_user( email=email, password=password, display_name=name ) # Store additional user data in Firebase Realtime Database user_ref = db.reference(f'users/{user.uid}') user_ref.set({ 'name': name, 'email': email, 'age': age, 'height': height, 'weight': weight, 'maxPullups': max_pullups, 'minPullups': min_pullups, 'gender': gender, 'progress':track }) return jsonify({"message": "User registered successfully!"}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route('/login', methods=['POST']) def login(): data = request.get_json() email = data['email'] password = data['password'] try: # Authenticate user with Firebase Auth user = auth.get_user_by_email(email) # Validate password (Firebase Authentication handles password validation) # No need to manually validate password here since Firebase Authentication ensures the password matches ref = db.reference("users") users_data = ref.get() for user_id,user_data in users_data.items(): if user_data.get('email') == email: user_name = user_data['name'] session['user_details'] = user_data session['progress'] = user_data['progress'] # session['progress'] = user_data.get('progress',{}) print(user_data['name']) return jsonify({"message":"sucess"}) except Exception as e: return jsonify({"error": "Invalid credentials"}), 401 # Home Route (Display User Credentials) @app.route('/home', methods=['GET']) def home(): # Get the ID token from the query parameter try : progress = session['progress'] except: progress = {} print(progress) sum = 0 length = 0 try: sum += progress['pushups'][-1] except: sum += 0 # progress = {"pushups":12} session[today_date] = {"bicepcurls":0,"squats":0,"pushups":0,"shoulderpress":0,"pullups":0} available_plans =[ {"name": "fitness maintenance", "description": "Build strength and muscle mass."}, {"name": "weight loss", "description": "Lose weight through healthy diet and exercise."}, ] # Process plan names in Python for plan in available_plans: # Use dictionary key 'name' instead of accessing as an attribute plan['name'] = plan['name'].replace(' ', '-').lower() # Define a recommended plan # Now pass the modified data to the template print(sum) if sum == 0: height = int(session['user_details']['height']) weight = int(session['user_details']['weight']) bmi = weight / (height/100)**2 recomm = "" if bmi < 18.5: recomm = "weight gain" available_plans.pop(2) elif 18.5 <= bmi <=24.9: recomm = "fitness maintenance" available_plans.pop(0) else: recomm = "weight loss" available_plans.pop(1) session['fitness_plan'] = recomm # Recommended workout code goes here return render_template('workout.html', recommended_plan=recomm, available_plans=available_plans) else: return render_template("currentprogress.html",sum=(sum/3) * 100,recommended_plan=progress["recommended_plan"]) @app.route('/upload-video/', methods=['POST']) def upload_video(exercise_name): if exercise_name == 'bicepcurls': if 'video' not in request.files: return "No video file uploaded", 400 video = request.files['video'] if video.filename == '': return "No selected file", 400 # Save the uploaded video with a unique name video_filename = f"{exercise_name}_{video.filename}" video_path = os.path.join("uploads", video_filename) video.save(video_path) return Response(process_bicep_curls(video_path), mimetype="multipart/x-mixed-replace; boundary=frame") elif exercise_name == "shoulderpress": if 'video' not in request.files: return "No video file uploaded", 400 video = request.files['video'] if video.filename == '': return "No selected file", 400 # Save the uploaded video with a unique name video_filename = f"{exercise_name}_{video.filename}" video_path = os.path.join("uploads", video_filename) video.save(video_path) return Response(shoulder_press_count(video_path), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route('/diet-data') def diet_data(): # Assuming the JSON file is stored in the static folder try: with open(os.path.join(app.static_folder, 'diet.json'), 'r') as f: diet_data = f.read() return diet_data except FileNotFoundError: return jsonify({"error": "Diet data not found."}), 404 @app.route('/diet') def diet(): return render_template("diet.html") from feed_back_llm import update_feed_back @app.route("/daily-report") def daily_report(): global curr_track print(curr_track) planned_reps = 8 planned_resttime = 1 counted_restime = 2 update_feed_back() return render_template("feed_back.html") @app.route('/weight-gain') def weight_gain(): workouts = [ {"name":"bicepcurls", "reps":12}, {"name":"shoulderpress", "reps":10}, {"name":"pullups", "reps":10}, {"name":"squats", "reps":15}, ] return render_template("workouts_List.html",exercises=workouts) @app.route('/weight-loss') def weight_loss(): workouts = [ {"name":"bicepcurls", "reps":12}, {"name":"shoulderpress", "reps":10}, {"name":"pullups", "reps":10}, {"name":"squats", "reps":15}, ] return render_template("workouts_List.html",exercises=workouts) @app.route('/progress') def progress(): return render_template("currentprogress.html",sum=70) @app.route('/fitness-maintenance') def fitness_maintenance(): workouts = [ {"name":"bicepcurls", "reps":12}, {"name":"shoulderpress", "reps":10}, {"name":"pullups", "reps":10}, {"name":"squats", "reps":15}, ] return render_template("workouts_List.html",exercises=workouts) if __name__ == '__main__': app.run(debug=True)