Spaces:
Running
Running
| import os | |
| import math | |
| import numpy as np | |
| import cv2 | |
| from collections import deque | |
| from ultralytics import YOLO | |
| import mediapipe as mp | |
| from flask import Flask, render_template, request, send_file, jsonify, Response | |
| from werkzeug.utils import secure_filename | |
| import zipfile | |
| import shutil | |
| import json | |
| from queue import Queue | |
| app = Flask(__name__) | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| app.config['OUTPUT_FOLDER'] = 'outputs' | |
| app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size | |
| # Create necessary folders | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True) | |
| # Progress tracking | |
| progress_queue = Queue() | |
| # Load YOLO model | |
| model = YOLO('yolov8n.pt') | |
| # MediaPipe setup | |
| mp_drawing = mp.solutions.drawing_utils | |
| mp_pose = mp.solutions.pose | |
| def calculate_angle(shoulder_center, hip_center): | |
| dy = shoulder_center[1] - hip_center[1] | |
| dx = shoulder_center[0] - hip_center[0] | |
| angle = math.atan2(dy, dx) | |
| return abs(90 - np.degrees(angle)) | |
| def classify_posture(torso_angle, standing_threshold=20, horizontal_threshold=50): | |
| if torso_angle < standing_threshold: | |
| return "Standing" | |
| elif torso_angle > horizontal_threshold: | |
| return "Lying" | |
| else: | |
| return "Falling" | |
| def process_video(video_path, output_folder): | |
| camera = cv2.VideoCapture(video_path) | |
| fps = camera.get(cv2.CAP_PROP_FPS) | |
| width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(camera.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Process every N frames to speed up (CPU is slow) | |
| PROCESS_EVERY_N_FRAMES = 5 # 5x faster - process 1 out of every 5 frames | |
| output_video_path = os.path.join(output_folder, 'processed_video.mp4') | |
| # Use mp4v codec - most compatible with OpenCV without extra dependencies | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| if not video_writer.isOpened(): | |
| print(f"ERROR: Failed to initialize video writer with mp4v codec") | |
| print(f"Video properties: {width}x{height} @ {fps} fps") | |
| raise Exception("Failed to initialize video writer") | |
| fall_counter = 0 | |
| fall_detected = False | |
| fall_video_writer = None | |
| fall_video_count = 0 | |
| frames_after_fall = 0 | |
| frames_after_fall_duration = 10 | |
| buffer_size = 10 | |
| frame_buffer = deque(maxlen=buffer_size) | |
| fall_videos = [] | |
| frame_count = 0 | |
| with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: | |
| while camera.isOpened(): | |
| ret, frame = camera.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| # Report progress | |
| progress = int((frame_count / total_frames) * 100) | |
| progress_queue.put(progress) | |
| # Skip frames for speed | |
| if frame_count % PROCESS_EVERY_N_FRAMES != 0: | |
| frame_buffer.append(frame) | |
| video_writer.write(frame) | |
| if fall_video_writer is not None: | |
| fall_video_writer.write(frame) | |
| continue | |
| results = model(frame) | |
| for result in results: | |
| for bbox, cls in zip(result.boxes.xyxy, result.boxes.cls): | |
| if int(cls) == 0: # Person class | |
| x1, y1, x2, y2 = map(int, bbox) | |
| person_bbox = frame[y1:y2, x1:x2] | |
| person_bbox_rgb = cv2.cvtColor(person_bbox, cv2.COLOR_BGR2RGB) | |
| person_results = pose.process(person_bbox_rgb) | |
| if person_results.pose_landmarks: | |
| landmarks = person_results.pose_landmarks.landmark | |
| shoulders = [ | |
| (landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * person_bbox.shape[1], | |
| landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * person_bbox.shape[0]), | |
| (landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x * person_bbox.shape[1], | |
| landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y * person_bbox.shape[0]) | |
| ] | |
| hips = [ | |
| (landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x * person_bbox.shape[1], | |
| landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y * person_bbox.shape[0]), | |
| (landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x * person_bbox.shape[1], | |
| landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y * person_bbox.shape[0]) | |
| ] | |
| shoulder_center = ((shoulders[0][0] + shoulders[1][0]) / 2, | |
| (shoulders[0][1] + shoulders[1][1]) / 2) | |
| hip_center = ((hips[0][0] + hips[1][0]) / 2, | |
| (hips[0][1] + hips[1][1]) / 2) | |
| torso_angle = calculate_angle(hip_center, shoulder_center) | |
| posture = classify_posture(torso_angle) | |
| if posture == "Falling": | |
| fall_counter += 1 | |
| frames_after_fall = 0 | |
| if fall_counter >= 0.1 * fps and fall_counter <= 0.5 * fps and fall_video_writer is None: | |
| fall_video_count += 1 | |
| fall_video_file = os.path.join(output_folder, f'fall_{fall_video_count}.mp4') | |
| fall_video_writer = cv2.VideoWriter(fall_video_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |
| fall_videos.append(fall_video_file) | |
| while frame_buffer: | |
| fall_video_writer.write(frame_buffer.popleft()) | |
| else: | |
| fall_counter = 0 | |
| if fall_video_writer is not None: | |
| fall_video_writer.write(frame) | |
| if posture == "Standing" and fall_video_writer is not None: | |
| frames_after_fall += 1 | |
| frame[y1:y2, x1:x2] = person_bbox | |
| frame_buffer.append(frame) | |
| video_writer.write(frame) | |
| if frames_after_fall > frames_after_fall_duration and fall_video_writer is not None: | |
| fall_video_writer.release() | |
| fall_video_writer = None | |
| camera.release() | |
| video_writer.release() | |
| if fall_video_writer is not None: | |
| fall_video_writer.release() | |
| # Check if video was created successfully | |
| if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0: | |
| print(f"SUCCESS: Processed video created: {output_video_path} ({os.path.getsize(output_video_path)} bytes)") | |
| else: | |
| print(f"ERROR: Processed video not created or empty: {output_video_path}") | |
| raise Exception("Failed to create processed video") | |
| return output_video_path, fall_videos | |
| def index(): | |
| return render_template('index.html') | |
| def progress(): | |
| def generate(): | |
| last_progress = 0 | |
| while True: | |
| try: | |
| progress = progress_queue.get(timeout=0.5) | |
| last_progress = progress | |
| yield f"data: {json.dumps({'progress': progress})}\n\n" | |
| if progress >= 100: | |
| break | |
| except: | |
| # Keep sending last known progress instead of 0 | |
| if last_progress < 100: | |
| yield f"data: {json.dumps({'progress': last_progress})}\n\n" | |
| else: | |
| break | |
| return Response(generate(), mimetype='text/event-stream') | |
| def upload_file(): | |
| if 'video' not in request.files: | |
| return jsonify({'error': 'No video file uploaded'}), 400 | |
| file = request.files['video'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| if file: | |
| # Clear progress queue | |
| while not progress_queue.empty(): | |
| try: | |
| progress_queue.get_nowait() | |
| except: | |
| break | |
| # Clean previous uploads and outputs | |
| for folder in [app.config['UPLOAD_FOLDER'], app.config['OUTPUT_FOLDER']]: | |
| for filename in os.listdir(folder): | |
| file_path = os.path.join(folder, filename) | |
| try: | |
| if os.path.isfile(file_path): | |
| os.unlink(file_path) | |
| except Exception as e: | |
| print(f"Error deleting {file_path}: {e}") | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| try: | |
| processed_video, fall_videos = process_video(filepath, app.config['OUTPUT_FOLDER']) | |
| # Create zip file with all outputs | |
| zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip') | |
| print(f"Creating ZIP file: {zip_path}") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(processed_video, 'processed_video.mp4') | |
| print(f"Added to ZIP: processed_video.mp4") | |
| for i, fall_video in enumerate(fall_videos, 1): | |
| zipf.write(fall_video, f'fall_{i}.mp4') | |
| print(f"Added to ZIP: fall_{i}.mp4") | |
| print(f"ZIP created successfully. Fall count: {len(fall_videos)}") | |
| return jsonify({ | |
| 'success': True, | |
| 'fall_count': len(fall_videos), | |
| 'download_url': '/download' | |
| }) | |
| except Exception as e: | |
| print(f"ERROR in upload: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'error': str(e)}), 500 | |
| def download_file(): | |
| zip_path = os.path.join(app.config['OUTPUT_FOLDER'], 'results.zip') | |
| return send_file(zip_path, as_attachment=True, download_name='fall_detection_results.zip') | |
| def serve_video(filename): | |
| video_path = os.path.join(app.config['OUTPUT_FOLDER'], filename) | |
| response = send_file(video_path, mimetype='video/mp4') | |
| response.headers['Accept-Ranges'] = 'bytes' | |
| response.headers['Cache-Control'] = 'no-cache' | |
| return response | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=False) | |