the_espada / app.py
HEMANTH
corrected sessione rror
72a3ed4
from flask import Flask, request, jsonify, redirect, url_for, render_template, session, Response
import firebase_admin
from firebase_admin import credentials, auth, db
# from workouts.bicepCurls import bicepcurls
import os
import time
import mediapipe as mp
import cv2
import numpy as np
today_date = time.strftime("%Y-%m-%d")
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
curr_track = {}
# Initialize Flask app
app = Flask(__name__)
app.secret_key = '9177'
# Firebase Admin SDK Initialization
cred = credentials.Certificate("serviceAccountKey.json")
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://fitnessdb-c9b11-default-rtdb.firebaseio.com/'
})
def calculate_angle(a, b, c):
a = np.array(a) # First point
b = np.array(b) # Middle point
c = np.array(c) # Last point
radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
stop_processing = False # Global flag to control video stream
def is_shoulder_press_correct(landmarks, mp_pose):
# Get coordinates of shoulder, elbow, and wrist (left arm as example)
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
# Calculate angle at the elbow (shoulder, elbow, wrist)
elbow_angle = calculate_angle(shoulder, elbow, wrist)
# Check if the motion is vertical (wrist higher than elbow)
if wrist[1] < elbow[1] and elbow[1] < shoulder[1]:
# Ensure proper angle range for a shoulder press
if 160 <= elbow_angle <= 180:
return "Shoulder Press: Correct", True
else:
return "Shoulder Press: Incorrect - Elbow angle", False
else:
return "Shoulder Press: Incorrect - Alignment", False
# Video processing function
def shoulder_press_count(video_path):
global curr_track
cap = cv2.VideoCapture(video_path)
count = 0
rep_started = False
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Convert the frame to RGB
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image.flags.writeable = False
# Process the image for pose detection
results = pose.process(image)
# Convert back to BGR for rendering
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
# Extract landmarks
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
# Check shoulder press posture
feedback, is_correct = is_shoulder_press_correct(landmarks, mp_pose)
# Count reps
if is_correct and not rep_started:
rep_started = True
elif not is_correct and rep_started:
rep_started = False
count += 1
# Display feedback
cv2.putText(image, feedback, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if is_correct else (0, 0, 255), 2, cv2.LINE_AA)
# Display rep count
cv2.putText(image, f'Reps: {count}', (50, 100),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
# Draw landmarks
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
else:
# Warn if no landmarks are detected
cv2.putText(image, "No body detected", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
# Encode frame for streaming
ret, buffer = cv2.imencode('.jpg', image)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
cap.release()
curr_track['shoulderpress'] = count
print("shoulder_press count :",count)
def process_bicep_curls(video_path):
global curr_track
global stop_processing # Access the global flag to stop the stream
cap = cv2.VideoCapture(video_path)
count = 0
movement_dir = 0
# Initialize MediaPipe Pose
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
while cap.isOpened():
if stop_processing: # If stop_processing flag is set, break the loop
break
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (1280, 720))
# Convert the frame to RGB
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image.flags.writeable = False
# Process the image for pose detection
results = pose.process(image)
# Convert back to BGR for rendering
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
# Extract relevant joints
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
# Calculate elbow angle
elbow_angle = calculate_angle(shoulder, elbow, wrist)
# Determine movement direction and count reps
progress_percentage = np.interp(elbow_angle, (50, 160), (0, 100))
progress_bar = np.interp(elbow_angle, (50, 160), (650, 100))
color = (255, 0, 255)
if progress_percentage == 100:
color = (0, 255, 0)
if movement_dir == 0:
count += 0.5
movement_dir = 1
if progress_percentage == 0:
color = (0, 0, 255)
if movement_dir == 1:
count += 0.5
movement_dir = 0
# Draw Progress Bar
cv2.rectangle(frame, (1100, 100), (1175, 650), color, 3)
cv2.rectangle(frame, (1100, int(progress_bar)), (1175, 650), color, cv2.FILLED)
cv2.putText(frame, f'{int(progress_percentage)}%', (1100, 75), cv2.FONT_HERSHEY_PLAIN, 4, color, 4)
# Draw Counter
cv2.rectangle(frame, (0, 450), (250, 720), (0, 255, 0), cv2.FILLED)
cv2.putText(frame, str(int(count)), (45, 670), cv2.FONT_HERSHEY_PLAIN, 15, (255, 0, 0), 30)
# Draw landmarks
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
# Draw Finish Button on the video
cv2.rectangle(frame, (10, 10), (150, 60), (0, 0, 255), -1) # Red button
cv2.putText(frame, "FINISH", (20, 45), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# Encode the frame for streaming
ret, buffer = cv2.imencode('.jpg', frame)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
cap.release()
curr_track['bicepcurls'] = count
# Flask route to stop the video feed
@app.route('/stop_stream', methods=['POST'])
def stop_stream():
global stop_processing
stop_processing = True # Stop the video stream when this route is hit
return "Stream stopped", 200
@app.route('/')
def index():
return render_template('index.html')
# Login Route
@app.route('/register', methods=['POST'])
def register():
data = request.get_json()
session['progress'] = {}
session['recommended_plan'] = ""
# Extract user details from the request
name = data['name']
email = data['email']
password = data['password']
age = data['age']
height = data['height']
weight = data['weight']
max_pullups = data['maxPullups']
min_pullups = data['minPullups']
gender = data['gender']
pushups = data.get('pushups',[])
squats = data.get('squats',[])
pullups = data.get('pullups',[])
shoulderpress = data.get('shoulderpress',[])
bicepcurls = data.get('bicepcurls',[])
track = {
"pushups": pushups,
"squats": squats,
"pullups": pullups,
'shoulder_press' : shoulderpress,
'bicepcurls':bicepcurls,
'recommended_plan':""
}
session['progress'] = track
# bicepcurls = data.get()
# Create Firebase user using Firebase Auth
try:
# Create the user in Firebase Authentication
user = auth.create_user(
email=email,
password=password,
display_name=name
)
# Store additional user data in Firebase Realtime Database
user_ref = db.reference(f'users/{user.uid}')
user_ref.set({
'name': name,
'email': email,
'age': age,
'height': height,
'weight': weight,
'maxPullups': max_pullups,
'minPullups': min_pullups,
'gender': gender,
'progress':track
})
return jsonify({"message": "User registered successfully!"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
email = data['email']
password = data['password']
try:
# Authenticate user with Firebase Auth
user = auth.get_user_by_email(email)
# Validate password (Firebase Authentication handles password validation)
# No need to manually validate password here since Firebase Authentication ensures the password matches
ref = db.reference("users")
users_data = ref.get()
for user_id,user_data in users_data.items():
if user_data.get('email') == email:
user_name = user_data['name']
session['user_details'] = user_data
session['progress'] = user_data['progress']
# session['progress'] = user_data.get('progress',{})
print(user_data['name'])
return jsonify({"message":"sucess"})
except Exception as e:
return jsonify({"error": "Invalid credentials"}), 401
# Home Route (Display User Credentials)
@app.route('/home', methods=['GET'])
def home():
# Get the ID token from the query parameter
try :
progress = session['progress']
except:
progress = {}
print(progress)
sum = 0
length = 0
try:
sum += progress['pushups'][-1]
except:
sum += 0
# progress = {"pushups":12}
session[today_date] = {"bicepcurls":0,"squats":0,"pushups":0,"shoulderpress":0,"pullups":0}
available_plans =[
{"name": "fitness maintenance", "description": "Build strength and muscle mass."},
{"name": "weight loss", "description": "Lose weight through healthy diet and exercise."},
]
# Process plan names in Python
for plan in available_plans:
# Use dictionary key 'name' instead of accessing as an attribute
plan['name'] = plan['name'].replace(' ', '-').lower()
# Define a recommended plan
# Now pass the modified data to the template
print(sum)
if sum == 0:
height = int(session['user_details']['height'])
weight = int(session['user_details']['weight'])
bmi = weight / (height/100)**2
recomm = ""
if bmi < 18.5:
recomm = "weight gain"
available_plans.pop(2)
elif 18.5 <= bmi <=24.9:
recomm = "fitness maintenance"
available_plans.pop(0)
else:
recomm = "weight loss"
available_plans.pop(1)
session['fitness_plan'] = recomm
# Recommended workout code goes here
return render_template('workout.html', recommended_plan=recomm, available_plans=available_plans)
else:
return render_template("currentprogress.html",sum=(sum/3) * 100,recommended_plan=progress["recommended_plan"])
@app.route('/upload-video/<exercise_name>', methods=['POST'])
def upload_video(exercise_name):
if exercise_name == 'bicepcurls':
if 'video' not in request.files:
return "No video file uploaded", 400
video = request.files['video']
if video.filename == '':
return "No selected file", 400
# Save the uploaded video with a unique name
video_filename = f"{exercise_name}_{video.filename}"
video_path = os.path.join("uploads", video_filename)
video.save(video_path)
return Response(process_bicep_curls(video_path), mimetype="multipart/x-mixed-replace; boundary=frame")
elif exercise_name == "shoulderpress":
if 'video' not in request.files:
return "No video file uploaded", 400
video = request.files['video']
if video.filename == '':
return "No selected file", 400
# Save the uploaded video with a unique name
video_filename = f"{exercise_name}_{video.filename}"
video_path = os.path.join("uploads", video_filename)
video.save(video_path)
return Response(shoulder_press_count(video_path), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route('/diet-data')
def diet_data():
# Assuming the JSON file is stored in the static folder
try:
with open(os.path.join(app.static_folder, 'diet.json'), 'r') as f:
diet_data = f.read()
return diet_data
except FileNotFoundError:
return jsonify({"error": "Diet data not found."}), 404
@app.route('/diet')
def diet():
return render_template("diet.html")
from feed_back_llm import update_feed_back
@app.route("/daily-report")
def daily_report():
global curr_track
print(curr_track)
planned_reps = 8
planned_resttime = 1
counted_restime = 2
update_feed_back()
return render_template("feed_back.html")
@app.route('/weight-gain')
def weight_gain():
workouts = [
{"name":"bicepcurls", "reps":12},
{"name":"shoulderpress", "reps":10},
{"name":"pullups", "reps":10},
{"name":"squats", "reps":15},
]
return render_template("workouts_List.html",exercises=workouts)
@app.route('/weight-loss')
def weight_loss():
workouts = [
{"name":"bicepcurls", "reps":12},
{"name":"shoulderpress", "reps":10},
{"name":"pullups", "reps":10},
{"name":"squats", "reps":15},
]
return render_template("workouts_List.html",exercises=workouts)
@app.route('/progress')
def progress():
return render_template("currentprogress.html",sum=70)
@app.route('/fitness-maintenance')
def fitness_maintenance():
workouts = [
{"name":"bicepcurls", "reps":12},
{"name":"shoulderpress", "reps":10},
{"name":"pullups", "reps":10},
{"name":"squats", "reps":15},
]
return render_template("workouts_List.html",exercises=workouts)
if __name__ == '__main__':
app.run(debug=True)