import os import math import numpy as np import cv2 from collections import deque from ultralytics import YOLO import mediapipe as mp from flask import Flask, render_template, request, send_file, jsonify, Response from werkzeug.utils import secure_filename import zipfile import shutil import json from queue import Queue app = Flask(__name__) app.config['UPLOAD_FOLDER'] = 'uploads' app.config['OUTPUT_FOLDER'] = 'outputs' app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size # Create necessary folders os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True) # Progress tracking progress_queue = Queue() # Load YOLO model model = YOLO('yolov8n.pt') # MediaPipe setup mp_drawing = mp.solutions.drawing_utils mp_pose = mp.solutions.pose def calculate_angle(shoulder_center, hip_center): dy = shoulder_center[1] - hip_center[1] dx = shoulder_center[0] - hip_center[0] angle = math.atan2(dy, dx) return abs(90 - np.degrees(angle)) def classify_posture(torso_angle, standing_threshold=20, horizontal_threshold=50): if torso_angle < standing_threshold: return "Standing" elif torso_angle > horizontal_threshold: return "Lying" else: return "Falling" def process_video(video_path, output_folder): camera = cv2.VideoCapture(video_path) fps = camera.get(cv2.CAP_PROP_FPS) width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(camera.get(cv2.CAP_PROP_FRAME_COUNT)) # Process every N frames to speed up (CPU is slow) PROCESS_EVERY_N_FRAMES = 5 # 5x faster - process 1 out of every 5 frames output_video_path = os.path.join(output_folder, 'processed_video.mp4') # Use mp4v codec - most compatible with OpenCV without extra dependencies fourcc = cv2.VideoWriter_fourcc(*'mp4v') video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) if not video_writer.isOpened(): print(f"ERROR: Failed to initialize video writer with mp4v codec") print(f"Video properties: {width}x{height} @ {fps} fps") raise Exception("Failed to initialize video writer") fall_counter = 0 fall_detected = False fall_video_writer = None fall_video_count = 0 frames_after_fall = 0 frames_after_fall_duration = 10 buffer_size = 10 frame_buffer = deque(maxlen=buffer_size) fall_videos = [] frame_count = 0 with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: while camera.isOpened(): ret, frame = camera.read() if not ret: break frame_count += 1 # Report progress progress = int((frame_count / total_frames) * 100) progress_queue.put(progress) # Skip frames for speed if frame_count % PROCESS_EVERY_N_FRAMES != 0: frame_buffer.append(frame) video_writer.write(frame) if fall_video_writer is not None: fall_video_writer.write(frame) continue results = model(frame) for result in results: for bbox, cls in zip(result.boxes.xyxy, result.boxes.cls): if int(cls) == 0: # Person class x1, y1, x2, y2 = map(int, bbox) person_bbox = frame[y1:y2, x1:x2] person_bbox_rgb = cv2.cvtColor(person_bbox, cv2.COLOR_BGR2RGB) person_results = pose.process(person_bbox_rgb) if person_results.pose_landmarks: landmarks = person_results.pose_landmarks.landmark shoulders = [ (landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * person_bbox.shape[1], landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * person_bbox.shape[0]), (landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x * person_bbox.shape[1], landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y * person_bbox.shape[0]) ] hips = [ (landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x * person_bbox.shape[1], landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y * person_bbox.shape[0]), (landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x * person_bbox.shape[1], landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y * person_bbox.shape[0]) ] shoulder_center = ((shoulders[0][0] + shoulders[1][0]) / 2, (shoulders[0][1] + shoulders[1][1]) / 2) hip_center = ((hips[0][0] + hips[1][0]) / 2, (hips[0][1] + hips[1][1]) / 2) torso_angle = calculate_angle(hip_center, shoulder_center) posture = classify_posture(torso_angle) if posture == "Falling": fall_counter += 1 frames_after_fall = 0 if fall_counter >= 0.1 * fps and fall_counter <= 0.5 * fps and fall_video_writer is None: fall_video_count += 1 fall_video_file = os.path.join(output_folder, f'fall_{fall_video_count}.mp4') fall_video_writer = cv2.VideoWriter(fall_video_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) fall_videos.append(fall_video_file) while frame_buffer: fall_video_writer.write(frame_buffer.popleft()) else: fall_counter = 0 if fall_video_writer is not None: fall_video_writer.write(frame) if posture == "Standing" and fall_video_writer is not None: frames_after_fall += 1 frame[y1:y2, x1:x2] = person_bbox frame_buffer.append(frame) video_writer.write(frame) if frames_after_fall > frames_after_fall_duration and fall_video_writer is not None: fall_video_writer.release() fall_video_writer = None camera.release() video_writer.release() if fall_video_writer is not None: fall_video_writer.release() # Check if video was created successfully if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0: print(f"SUCCESS: Processed video created: {output_video_path} ({os.path.getsize(output_video_path)} bytes)") else: print(f"ERROR: Processed video not created or empty: {output_video_path}") raise Exception("Failed to create processed video") return output_video_path, fall_videos @app.route('/') def index(): return render_template('index.html') @app.route('/progress') def progress(): def generate(): last_progress = 0 while True: try: progress = progress_queue.get(timeout=0.5) last_progress = progress yield f"data: {json.dumps({'progress': progress})}\n\n" if progress >= 100: break except: # Keep sending last known progress instead of 0 if last_progress < 100: yield f"data: {json.dumps({'progress': last_progress})}\n\n" else: break return Response(generate(), mimetype='text/event-stream') @app.route('/upload', methods=['POST']) def upload_file(): if 'video' not in request.files: return jsonify({'error': 'No video file uploaded'}), 400 file = request.files['video'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if file: # Clear progress queue while not progress_queue.empty(): try: progress_queue.get_nowait() except: break # Clean previous uploads and outputs for folder in [app.config['UPLOAD_FOLDER'], app.config['OUTPUT_FOLDER']]: for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: print(f"Error deleting {file_path}: {e}") filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) try: processed_video, fall_videos = process_video(filepath, app.config['OUTPUT_FOLDER']) # Create zip file with all outputs zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip') print(f"Creating ZIP file: {zip_path}") with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.write(processed_video, 'processed_video.mp4') print(f"Added to ZIP: processed_video.mp4") for i, fall_video in enumerate(fall_videos, 1): zipf.write(fall_video, f'fall_{i}.mp4') print(f"Added to ZIP: fall_{i}.mp4") print(f"ZIP created successfully. Fall count: {len(fall_videos)}") return jsonify({ 'success': True, 'fall_count': len(fall_videos), 'download_url': '/download' }) except Exception as e: print(f"ERROR in upload: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 @app.route('/download') def download_file(): zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip') return send_file(zip_path, as_attachment=True, download_name='fall_detection_results.zip') @app.route('/video/') def serve_video(filename): video_path = os.path.join(app.config['OUTPUT_FOLDER'], filename) response = send_file(video_path, mimetype='video/mp4') response.headers['Accept-Ranges'] = 'bytes' response.headers['Cache-Control'] = 'no-cache' return response if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)