koesan's picture
Update app.py
e278aac verified
import os
import math
import numpy as np
import cv2
from collections import deque
from ultralytics import YOLO
import mediapipe as mp
from flask import Flask, render_template, request, send_file, jsonify, Response
from werkzeug.utils import secure_filename
import zipfile
import shutil
import json
from queue import Queue
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['OUTPUT_FOLDER'] = 'outputs'
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size
# Create necessary folders
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
# Progress tracking
progress_queue = Queue()
# Load YOLO model
model = YOLO('yolov8n.pt')
# MediaPipe setup
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
def calculate_angle(shoulder_center, hip_center):
dy = shoulder_center[1] - hip_center[1]
dx = shoulder_center[0] - hip_center[0]
angle = math.atan2(dy, dx)
return abs(90 - np.degrees(angle))
def classify_posture(torso_angle, standing_threshold=20, horizontal_threshold=50):
if torso_angle < standing_threshold:
return "Standing"
elif torso_angle > horizontal_threshold:
return "Lying"
else:
return "Falling"
def process_video(video_path, output_folder):
camera = cv2.VideoCapture(video_path)
fps = camera.get(cv2.CAP_PROP_FPS)
width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(camera.get(cv2.CAP_PROP_FRAME_COUNT))
# Process every N frames to speed up (CPU is slow)
PROCESS_EVERY_N_FRAMES = 5 # 5x faster - process 1 out of every 5 frames
output_video_path = os.path.join(output_folder, 'processed_video.mp4')
# Use mp4v codec - most compatible with OpenCV without extra dependencies
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
if not video_writer.isOpened():
print(f"ERROR: Failed to initialize video writer with mp4v codec")
print(f"Video properties: {width}x{height} @ {fps} fps")
raise Exception("Failed to initialize video writer")
fall_counter = 0
fall_detected = False
fall_video_writer = None
fall_video_count = 0
frames_after_fall = 0
frames_after_fall_duration = 10
buffer_size = 10
frame_buffer = deque(maxlen=buffer_size)
fall_videos = []
frame_count = 0
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
while camera.isOpened():
ret, frame = camera.read()
if not ret:
break
frame_count += 1
# Report progress
progress = int((frame_count / total_frames) * 100)
progress_queue.put(progress)
# Skip frames for speed
if frame_count % PROCESS_EVERY_N_FRAMES != 0:
frame_buffer.append(frame)
video_writer.write(frame)
if fall_video_writer is not None:
fall_video_writer.write(frame)
continue
results = model(frame)
for result in results:
for bbox, cls in zip(result.boxes.xyxy, result.boxes.cls):
if int(cls) == 0: # Person class
x1, y1, x2, y2 = map(int, bbox)
person_bbox = frame[y1:y2, x1:x2]
person_bbox_rgb = cv2.cvtColor(person_bbox, cv2.COLOR_BGR2RGB)
person_results = pose.process(person_bbox_rgb)
if person_results.pose_landmarks:
landmarks = person_results.pose_landmarks.landmark
shoulders = [
(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * person_bbox.shape[1],
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * person_bbox.shape[0]),
(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x * person_bbox.shape[1],
landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y * person_bbox.shape[0])
]
hips = [
(landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x * person_bbox.shape[1],
landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y * person_bbox.shape[0]),
(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x * person_bbox.shape[1],
landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y * person_bbox.shape[0])
]
shoulder_center = ((shoulders[0][0] + shoulders[1][0]) / 2,
(shoulders[0][1] + shoulders[1][1]) / 2)
hip_center = ((hips[0][0] + hips[1][0]) / 2,
(hips[0][1] + hips[1][1]) / 2)
torso_angle = calculate_angle(hip_center, shoulder_center)
posture = classify_posture(torso_angle)
if posture == "Falling":
fall_counter += 1
frames_after_fall = 0
if fall_counter >= 0.1 * fps and fall_counter <= 0.5 * fps and fall_video_writer is None:
fall_video_count += 1
fall_video_file = os.path.join(output_folder, f'fall_{fall_video_count}.mp4')
fall_video_writer = cv2.VideoWriter(fall_video_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
fall_videos.append(fall_video_file)
while frame_buffer:
fall_video_writer.write(frame_buffer.popleft())
else:
fall_counter = 0
if fall_video_writer is not None:
fall_video_writer.write(frame)
if posture == "Standing" and fall_video_writer is not None:
frames_after_fall += 1
frame[y1:y2, x1:x2] = person_bbox
frame_buffer.append(frame)
video_writer.write(frame)
if frames_after_fall > frames_after_fall_duration and fall_video_writer is not None:
fall_video_writer.release()
fall_video_writer = None
camera.release()
video_writer.release()
if fall_video_writer is not None:
fall_video_writer.release()
# Check if video was created successfully
if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0:
print(f"SUCCESS: Processed video created: {output_video_path} ({os.path.getsize(output_video_path)} bytes)")
else:
print(f"ERROR: Processed video not created or empty: {output_video_path}")
raise Exception("Failed to create processed video")
return output_video_path, fall_videos
@app.route('/')
def index():
return render_template('index.html')
@app.route('/progress')
def progress():
def generate():
last_progress = 0
while True:
try:
progress = progress_queue.get(timeout=0.5)
last_progress = progress
yield f"data: {json.dumps({'progress': progress})}\n\n"
if progress >= 100:
break
except:
# Keep sending last known progress instead of 0
if last_progress < 100:
yield f"data: {json.dumps({'progress': last_progress})}\n\n"
else:
break
return Response(generate(), mimetype='text/event-stream')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'video' not in request.files:
return jsonify({'error': 'No video file uploaded'}), 400
file = request.files['video']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if file:
# Clear progress queue
while not progress_queue.empty():
try:
progress_queue.get_nowait()
except:
break
# Clean previous uploads and outputs
for folder in [app.config['UPLOAD_FOLDER'], app.config['OUTPUT_FOLDER']]:
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
print(f"Error deleting {file_path}: {e}")
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
try:
processed_video, fall_videos = process_video(filepath, app.config['OUTPUT_FOLDER'])
# Create zip file with all outputs
zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip')
print(f"Creating ZIP file: {zip_path}")
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(processed_video, 'processed_video.mp4')
print(f"Added to ZIP: processed_video.mp4")
for i, fall_video in enumerate(fall_videos, 1):
zipf.write(fall_video, f'fall_{i}.mp4')
print(f"Added to ZIP: fall_{i}.mp4")
print(f"ZIP created successfully. Fall count: {len(fall_videos)}")
return jsonify({
'success': True,
'fall_count': len(fall_videos),
'download_url': '/download'
})
except Exception as e:
print(f"ERROR in upload: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/download')
def download_file():
zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip')
return send_file(zip_path, as_attachment=True, download_name='fall_detection_results.zip')
@app.route('/video/<filename>')
def serve_video(filename):
video_path = os.path.join(app.config['OUTPUT_FOLDER'], filename)
response = send_file(video_path, mimetype='video/mp4')
response.headers['Accept-Ranges'] = 'bytes'
response.headers['Cache-Control'] = 'no-cache'
return response
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)