Spaces:
Sleeping
Sleeping
Antigravity Bot
Ultra-Optimization: 480px Resize + 15 FPS Throttle for low-bandwidth lag fix
563bf2e | import os | |
| import base64 | |
| from werkzeug.utils import secure_filename | |
| from flask import Flask, render_template, Response, jsonify, request, session | |
| import cv2 | |
| import threading | |
| import time | |
| import atexit | |
| import uuid | |
| import hashlib | |
| from video_processor import GestureRecognizer, GESTURE_NAMES, GESTURE_TRANSLATIONS | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max limit | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
| app.config['SESSION_COOKIE_SECURE'] = True | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| ALLOWED_EXTENSIONS = {'mp4', 'avi', 'mov', 'webm'} | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| import numpy as np | |
| # Global session management | |
| user_sessions = {} | |
| sessions_lock = threading.Lock() | |
| MAX_SESSIONS = 50 | |
| SESSION_TIMEOUT = 300 # More generous: 5 minutes inactivity | |
| def get_session_id(): | |
| # 1. Try Flask Session (Cookie-based) | |
| if 'user_id' in session: | |
| return session['user_id'] | |
| # 2. Fallback: Browser Fingerprint (IP + User-Agent) | |
| # Get first IP in case of proxy chain | |
| ip_raw = request.headers.get('X-Forwarded-For', request.remote_addr) | |
| ip = ip_raw.split(',')[0].strip() if ip_raw and ',' in ip_raw else ip_raw | |
| ua = request.headers.get('User-Agent', '') | |
| # Use MD5 for a compact ID | |
| fingerprint = hashlib.md5(f"{ip}_{ua}".encode()).hexdigest() | |
| sid = f"fp_{fingerprint}" | |
| # Store in session for next time | |
| session['user_id'] = sid | |
| return sid | |
| def is_probe_request(): | |
| """Detect bots or probes that shouldn't waste processing sessions""" | |
| ua = request.headers.get('User-Agent', '').lower() | |
| bot_keywords = ['health', 'huggingface', 'probe', 'bot', 'crawler', 'spider', 'scraping'] | |
| if any(k in ua for k in bot_keywords): | |
| return True | |
| return False | |
| def get_user_stream(): | |
| if is_probe_request(): | |
| return None | |
| sid = get_session_id() | |
| with sessions_lock: | |
| if sid not in user_sessions: | |
| if len(user_sessions) >= MAX_SESSIONS: | |
| print(f"[WARNING] Max sessions reached ({MAX_SESSIONS}). Rejecting {sid}") | |
| return None | |
| print(f"[SESSION] New: {sid} (UA: {request.headers.get('User-Agent')[:50]}...)") | |
| user_sessions[sid] = { | |
| 'stream': CameraStream(), | |
| 'last_activity': time.time() | |
| } | |
| else: | |
| user_sessions[sid]['last_activity'] = time.time() | |
| return user_sessions[sid]['stream'] | |
| def cleanup_sessions(): | |
| while True: | |
| time.sleep(60) | |
| now = time.time() | |
| to_delete = [] | |
| # 1. Identify stale sessions under lock | |
| with sessions_lock: | |
| for sid, data in user_sessions.items(): | |
| if now - data['last_activity'] > SESSION_TIMEOUT: | |
| # Store sid AND stream to stop it later outside the lock | |
| to_delete.append((sid, data['stream'])) | |
| # Remove from registry immediately | |
| for sid, _ in to_delete: | |
| del user_sessions[sid] | |
| # 2. Stop streams OUTSIDE the lock to prevent server hangs | |
| for sid, stream in to_delete: | |
| print(f"[SESSION] Expired/Cleaning: {sid}") | |
| stream.stop() | |
| threading.Thread(target=cleanup_sessions, daemon=True).start() | |
| class CameraStream: | |
| def __init__(self): | |
| self.source = 0 # 0 for webcam, string for file path | |
| self.queue = [] # Playlist queue | |
| self.video = None | |
| self.recognizer = GestureRecognizer() | |
| self.running = False | |
| self.lock = threading.Lock() | |
| self.last_prediction_update = 0 | |
| self.source_type = 'server' # Track source type: 'server', 'client', or 'video' | |
| self.alive = True # Thread control flag | |
| # Movement tracking state | |
| self.prev_landmarks = None | |
| self.movement_mag = 0 | |
| self.active_gesture = False | |
| self.quiet_frames = 0 | |
| self.MOVE_THRESHOLD = 0.008 | |
| self.STOP_FRAMES = 10 | |
| self.gesture_frames = [] # Dynamic buffer for "One Gesture" capture | |
| # FPS tracking | |
| self.fps = 0 | |
| self.frame_count = 0 | |
| self.fps_start_time = time.time() | |
| # State variables for user isolation | |
| self.output_frame = None | |
| self.latest_prediction = {'gesture': 'READY', 'confidence': 0, 'status': 'DETECTED GESTURE', 'fps': 0} | |
| # Timed Cycle Mode (1.8s Capture -> 3.0s Cooldown) | |
| self.cycle_state = 'CAPTURING' | |
| self.cycle_start_time = time.time() | |
| self.CAPTURE_DURATION = 1.8 | |
| self.COOLDOWN_DURATION = 3.0 | |
| # Start background thread | |
| self.thread = threading.Thread(target=self.process_frame, args=()) | |
| self.thread.daemon = True | |
| self.thread.start() | |
| def start_source(self, source=0, playlist=None): | |
| with self.lock: | |
| if self.video is not None: | |
| self.video.release() | |
| self.queue = playlist if playlist else [] | |
| # CRITICAL: Clear prediction buffers when switching sources | |
| self.frame_buffer = [] | |
| self.gloss_predictions = [] | |
| self.source_type = 'server' if source == 0 else 'video' | |
| self.recognizer.reset_tracking() # Reset for new stream | |
| # Setup cycle parameters for webcam | |
| self.latest_prediction['status'] = 'STARTING...' | |
| self.cycle_state = 'CAPTURING' | |
| self.cycle_start_time = time.time() | |
| self.gesture_frames = [] | |
| # Reset UI results | |
| self.latest_prediction['gesture'] = "READY" | |
| self.latest_prediction['confidence'] = 0.0 | |
| # Re-initialize recognizer internal state | |
| self.recognizer.reset_tracking() | |
| print(f"Buffers and Cycle cleared for new source (type: {self.source_type})") | |
| # ASYNC START: Just set the source and let the background thread open it | |
| self.source = source | |
| self.running = True | |
| # Pre-populate queue if provided (minus the first one which is self.source) | |
| # The logic in camera_control passes the first video as 'source' and the rest in 'playlist' | |
| # But here we should trust the args | |
| if playlist: | |
| # If source is already the first item, we shouldn't duplicate? | |
| # camera_control logic: first_video = playlist.pop(0), stream.start_source(first_video, playlist=playlist) | |
| # So playlist arg here contains the REMAINING videos. Correct. | |
| pass | |
| print(f"Async source scheduled: {self.source}") | |
| def stop_source(self): | |
| """Pause processing and release video capture without killing the thread""" | |
| with self.lock: | |
| self.running = False | |
| self.queue = [] | |
| self.frame_buffer = [] | |
| self.gloss_predictions = [] | |
| self.latest_prediction['status'] = 'DETECTED GESTURE' | |
| if self.video is not None: | |
| self.video.release() | |
| self.video = None | |
| print("Source stopped (Thread kept alive for session)") | |
| def stop(self): | |
| """Kill the thread - called only during session cleanup""" | |
| self.alive = False | |
| self.stop_source() | |
| if self.thread.is_alive(): | |
| self.thread.join(timeout=1.0) | |
| def _perform_prediction(self): | |
| sequence = None | |
| # WEBCAM: Use dynamic gesture_frames | |
| if self.source == 0 and self.gesture_frames: | |
| # Resample dynamic buffer to 30 frames | |
| dynamic_seq = np.array(self.gesture_frames, dtype=np.float32) | |
| if len(dynamic_seq) > 5: # Minimum frames to consider a gesture | |
| resampled = cv2.resize(dynamic_seq, (258, 30), interpolation=cv2.INTER_LINEAR) | |
| sequence = np.expand_dims(resampled, axis=0) | |
| else: | |
| print("Gesture too short, skipped.") | |
| return | |
| # VIDEO: Use sliding window buffer or handle short clips | |
| elif len(self.frame_buffer) >= 5: # Minimum 5 frames for any prediction | |
| seq_list = list(self.frame_buffer) | |
| if len(seq_list) >= 30: | |
| sequence = np.array(seq_list[-30:], dtype=np.float32) | |
| else: | |
| # Pad short video clip | |
| sequence = np.array(seq_list, dtype=np.float32) | |
| pad = np.zeros((30 - len(sequence), 258), dtype=np.float32) | |
| sequence = np.vstack([pad, sequence]) | |
| sequence = np.expand_dims(sequence, axis=0) # (1, 30, 258) | |
| if sequence is None: | |
| return | |
| try: | |
| pred_result = self.recognizer.predict_from_sequence(sequence) | |
| gesture_name = pred_result['gesture_name'] | |
| confidence = pred_result['confidence'] | |
| probs = pred_result['probabilities'] | |
| top_3 = [] | |
| if probs is not None: | |
| top_indices = probs.argsort()[-3:][::-1] | |
| top_3 = [{"name": GESTURE_NAMES.get(i, f"G{i}"), "prob": float(probs[i])} for i in top_indices] | |
| if gesture_name is None or confidence < 0.6: | |
| gesture_name = "Unknown" | |
| # Update prediction based on source type | |
| if self.source == 0: | |
| # Webcam: immediate or triggered update | |
| self.latest_prediction.update({ | |
| "gesture": gesture_name, | |
| "confidence": confidence, | |
| "top_3": top_3, | |
| "hands_detected": pred_result.get('hands_detected', False), | |
| "pose_detected": pred_result.get('pose_detected', False) | |
| }) | |
| print(f"Webcam Prediction: {gesture_name} ({confidence:.2%})") | |
| else: | |
| # Video: accumulate and show best | |
| self.latest_prediction['status'] = 'PROCESSING VIDEO' | |
| if gesture_name != "Unknown": | |
| self.gloss_predictions.append({"gesture": gesture_name, "confidence": confidence, "top_3": top_3}) | |
| if self.gloss_predictions: | |
| best_prediction = max(self.gloss_predictions, key=lambda x: x['confidence']) | |
| self.latest_prediction.update({ | |
| "gesture": best_prediction['gesture'], | |
| "confidence": best_prediction['confidence'], | |
| "top_3": best_prediction.get('top_3', []) | |
| }) | |
| # Always ensure FPS is present | |
| self.latest_prediction['fps'] = round(self.fps, 1) | |
| except Exception as e: | |
| print(f"Error in _perform_prediction: {e}") | |
| def process_webcam_frame(self, frame): | |
| """State machine for webcam timed cycle, optimized for cloud CPU""" | |
| now = time.time() | |
| timestamp_ms = int(now * 1000) | |
| # 1. Higher resolution for L40S GPU to improve precision | |
| # OPTIMIZATION: Reduce to 480x360 (VGA) for bandwidth saving | |
| high_res_frame = cv2.resize(frame, (480, 360)) | |
| # 2. Extract features (always needed for the pulse/history if capturing) | |
| result = self.recognizer.predict(high_res_frame, timestamp_ms=timestamp_ms) | |
| hands_detected = result.get('hand_result') is not None and result['hand_result'].hand_landmarks and len(result['hand_result'].hand_landmarks) > 0 if result.get('hand_result') else False | |
| # Always update attention indicators | |
| self.latest_prediction['hands_detected'] = hands_detected | |
| self.latest_prediction['pose_detected'] = result.get('pose_result') is not None and result['pose_result'].pose_landmarks and len(result['pose_result'].pose_landmarks) > 0 | |
| # 3. Handle Timed Cycle State Machine | |
| elapsed = now - self.cycle_start_time | |
| if self.cycle_state == 'CAPTURING': | |
| if hands_detected: | |
| # Add extracted features to buffer | |
| landmarks = self.recognizer.extract_features(high_res_frame, timestamp_ms=timestamp_ms) | |
| self.gesture_frames.append(landmarks) | |
| remaining = max(0, self.CAPTURE_DURATION - elapsed) | |
| self.latest_prediction['status'] = f"RECORDING ({remaining:.1f}s)" | |
| if elapsed >= self.CAPTURE_DURATION: | |
| if len(self.gesture_frames) > 5: | |
| # Final prediction from the recording window | |
| sequence = np.array(self.gesture_frames[-30:]) if len(self.gesture_frames) >= 30 else np.array(self.gesture_frames) | |
| # Pad if needed | |
| if len(sequence) < 30: | |
| pad = np.zeros((30 - len(sequence), 258)) | |
| sequence = np.vstack([pad, sequence]) | |
| sequence = np.expand_dims(sequence, axis=0) | |
| pred_res = self.recognizer.predict_from_sequence(sequence) | |
| self.latest_prediction.update({ | |
| "gesture": pred_res['gesture_name'], | |
| "confidence": float(pred_res['confidence']), | |
| "top_3": [] # Could populate if needed | |
| }) | |
| else: | |
| self.latest_prediction['gesture'] = "NO HANDS" | |
| self.latest_prediction['confidence'] = 0.0 | |
| self.cycle_state = 'COOLDOWN' | |
| self.cycle_start_time = now | |
| self.gesture_frames = [] | |
| else: | |
| # COOLDOWN | |
| remaining = max(0, self.COOLDOWN_DURATION - elapsed) | |
| self.latest_prediction['status'] = f"NEXT IN {remaining:.1f}s" | |
| if elapsed >= self.COOLDOWN_DURATION: | |
| self.cycle_state = 'CAPTURING' | |
| self.cycle_start_time = now | |
| self.latest_prediction['status'] = "STARTING..." | |
| # 4. Generate annotated frame (Draw on original frame size) | |
| annotated = self.recognizer.draw_landmarks(frame, result['pose_result'], result['hand_result']) | |
| return annotated, self.latest_prediction | |
| def process_frame(self): | |
| while self.alive: | |
| # FPS Calculation | |
| self.frame_count += 1 | |
| loop_start = time.time() | |
| if loop_start - self.fps_start_time > 1.0: | |
| self.fps = self.frame_count / (loop_start - self.fps_start_time) | |
| self.frame_count = 0 | |
| self.fps_start_time = loop_start | |
| self.latest_prediction['fps'] = round(self.fps, 1) | |
| should_advance = False | |
| # 0. Async Video Opening Logic (Non-blocking for HTTP threads) | |
| if self.running and self.video is None: | |
| # Try to open the scheduled source | |
| print(f"Background worker opening source: {self.source}") | |
| self.video = cv2.VideoCapture(self.source) | |
| if not self.video.isOpened(): | |
| print(f"Failed to open source in background: {self.source}") | |
| # Trigger advance to next in queue | |
| if self.source != 0: # Don't advance if webcam fails (retry?) | |
| should_advance = True | |
| else: | |
| print(f"Background worker successfully opened: {self.source}") | |
| if self.running and self.video is not None and self.video.isOpened(): | |
| success, frame = self.video.read() | |
| if success: | |
| # Mirror frame only if webcam | |
| if self.source == 0: | |
| frame = cv2.flip(frame, 1) | |
| # OPTIMIZATION: Resize input immediately to reduce pipeline load | |
| # Target 480px width (VGA) | |
| if frame.shape[1] > 480: | |
| scale = 480 / frame.shape[1] | |
| h = int(frame.shape[0] * scale) | |
| frame = cv2.resize(frame, (480, h), interpolation=cv2.INTER_AREA) | |
| # Extract landmarks from current frame with timestamp | |
| timestamp_ms = int(time.time() * 1000) | |
| landmarks = self.recognizer.extract_features(frame, timestamp_ms=timestamp_ms) | |
| self.frame_buffer.append(landmarks) | |
| if len(self.frame_buffer) > 60: # Limit buffer size | |
| self.frame_buffer = self.frame_buffer[-60:] | |
| # Process frame for visualization | |
| try: | |
| # Get landmarks for drawing | |
| _, pose_result, hand_result = self.recognizer.extract_landmarks(frame, timestamp_ms=timestamp_ms) | |
| # Draw landmarks | |
| annotated_frame = self.recognizer.draw_landmarks(frame, pose_result, hand_result) | |
| # Update detection flags for HUD sync during video playback | |
| hands_detected = hand_result is not None and hand_result.hand_landmarks and len(hand_result.hand_landmarks) > 0 if hand_result else False | |
| pose_detected = pose_result is not None and pose_result.pose_landmarks and len(pose_result.pose_landmarks) > 0 | |
| self.latest_prediction.update({ | |
| "hands_detected": hands_detected, | |
| "pose_detected": pose_detected | |
| }) | |
| # Display frame is already resized | |
| with self.lock: | |
| self.output_frame = annotated_frame.copy() | |
| # CONTINUOUS PREDICTION FOR VIDEO FILES | |
| # OPTIMIZATION: Predict only every 3rd frame (10 FPS inference) to save CPU | |
| if self.source != 0 and len(self.frame_buffer) >= 30: | |
| # Use a persistent counter for stride | |
| if not hasattr(self, '_pred_stride'): self._pred_stride = 0 | |
| self._pred_stride += 1 | |
| if self._pred_stride % 3 == 0: | |
| self._perform_prediction() | |
| # Slide window for videos | |
| self.frame_buffer = self.frame_buffer[10:] | |
| except Exception as e: | |
| print(f"Error processing frame: {e}") | |
| if self.source != 0: | |
| # Adaptive Frame Pacing for ~30 FPS | |
| process_duration = time.time() - loop_start | |
| delay = max(0, 0.033 - process_duration) | |
| time.sleep(delay) | |
| else: | |
| # Video stream finished | |
| if isinstance(self.source, str): | |
| print(f"Physical end of video reached: {self.source}") | |
| should_advance = True | |
| else: | |
| time.sleep(0.1) | |
| else: | |
| # Video not opened or not running | |
| if self.running and isinstance(self.source, str): | |
| print(f"Video stream not opened or stopped: {self.source}") | |
| should_advance = True | |
| else: | |
| time.sleep(0.1) | |
| # Queue advancement logic | |
| if should_advance: | |
| print(f"Advancing from {self.source}...") | |
| # Show final prediction for the finished video | |
| if self.gloss_predictions: | |
| best_prediction = max(self.gloss_predictions, key=lambda x: x['confidence']) | |
| self.latest_prediction.update({ | |
| "gesture": best_prediction['gesture'], | |
| "confidence": best_prediction['confidence'], | |
| "top_3": best_prediction.get('top_3', []) | |
| }) | |
| print(f"FINAL GLOSS for {self.source}: {best_prediction['gesture']} ({best_prediction['confidence']:.2%})") | |
| # Clear buffers for next video | |
| self.frame_buffer = [] | |
| self.gloss_predictions = [] | |
| self.prev_landmarks = None | |
| self.active_gesture = False | |
| # Release current video | |
| if self.video: | |
| self.video.release() | |
| # Check for next in queue | |
| if self.queue: | |
| self.source = self.queue.pop(0) | |
| print(f"Opening next video in queue: {self.source}") | |
| self.video = cv2.VideoCapture(self.source) | |
| if not self.video.isOpened(): | |
| print(f"Error: Could not open {self.source}") | |
| # If opening fails, keep should_advance=True to try next in queue? | |
| # Actually, we should loop until we find a valid one or empty queue | |
| continue | |
| else: | |
| print("Playlist finished - no more videos.") | |
| self.running = False | |
| self.source = None | |
| self.video = None | |
| # Idle sleep if not running to avoid high CPU | |
| if not self.running: | |
| time.sleep(0.1) | |
| def stop(self): | |
| self.stop_source() | |
| # Cleanup helper kept for reference if needed | |
| def stop(): | |
| with sessions_lock: | |
| for sid, data in user_sessions.items(): | |
| data['stream'].stop() | |
| def generate(stream): | |
| # Load placeholder image once | |
| placeholder_path = os.path.join('static', 'placeholder.png') | |
| placeholder_frame = cv2.imread(placeholder_path) | |
| while stream.alive: | |
| with stream.lock: | |
| if stream.output_frame is None: | |
| # Serve placeholder when no active source | |
| if placeholder_frame is not None: | |
| (flag, encodedImage) = cv2.imencode(".jpg", placeholder_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) | |
| if flag: | |
| yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n') | |
| time.sleep(0.1) | |
| continue | |
| (flag, encodedImage) = cv2.imencode(".jpg", stream.output_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50]) | |
| if not flag: | |
| continue | |
| yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n') | |
| time.sleep(0.066) # Limit stream to 15 FPS to save bandwidth | |
| def index(): | |
| return render_template('index.html') | |
| def status(): | |
| stream = get_user_stream() | |
| if stream is None: | |
| return jsonify({'error': 'Server Busy (Max Users Reached)', 'busy': True}), 503 | |
| return jsonify(stream.latest_prediction) | |
| def get_gestures(): | |
| # Convert integer keys to string keys for JSON compatibility if needed, | |
| # but GESTURE_NAMES has int keys. jsonify handles them but usually converts keys to strings. | |
| return jsonify(GESTURE_NAMES) | |
| def camera_control(): | |
| stream = get_user_stream() | |
| if stream is None: | |
| return jsonify({'error': 'Server Busy (Max Users Reached)', 'busy': True}), 503 | |
| print("=" * 50) | |
| print("CAMERA CONTROL ENDPOINT HIT") | |
| data = request.json | |
| print(f"Camera Control Payload: {data}") # DEBUG LOG | |
| action = data.get('action') | |
| source_type = data.get('source', 'webcam') | |
| filename = data.get('filename') | |
| print(f"Action: {action}, Source: {source_type}, Filename: {filename}") | |
| if action == 'stop': | |
| # Don't strictly stop, just clear queue and stop current source | |
| print("Stopping camera stream...") | |
| stream.stop_source() | |
| return jsonify({"status": "stopped"}) | |
| elif action == 'start': | |
| if source_type == 'webcam': | |
| print("Starting webcam...") | |
| stream.start_source(0) | |
| elif source_type == 'video' and filename: | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename)) | |
| print(f"Starting single video: {filepath}") | |
| if os.path.exists(filepath): | |
| stream.start_source(filepath) | |
| else: | |
| print(f"File not found: {filepath}") | |
| return jsonify({"error": "File not found"}), 404 | |
| elif source_type == 'playlist': | |
| filenames = data.get('filenames', []) | |
| print(f"Starting playlist with {len(filenames)} files: {filenames}") | |
| playlist = [] | |
| for fname in filenames: | |
| fpath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(fname)) | |
| print(f"Checking file: {fpath}, exists: {os.path.exists(fpath)}") | |
| if os.path.exists(fpath): | |
| playlist.append(fpath) | |
| print(f"Valid playlist files: {len(playlist)}") | |
| if playlist: | |
| # Start first, queue rest | |
| first_video = playlist.pop(0) | |
| print(f"Starting first video: {first_video}, queue: {playlist}") | |
| stream.start_source(first_video, playlist=playlist) | |
| else: | |
| print("No valid files in playlist!") | |
| return jsonify({"error": "No valid files in playlist"}), 400 | |
| print(f"Returning success: started {source_type}") | |
| return jsonify({"status": "started", "source": source_type}) | |
| print("Invalid action!") | |
| return jsonify({"error": "Invalid action"}), 400 | |
| def upload_file(): | |
| # Handle multiple files | |
| files = request.files.getlist('files[]') | |
| is_batch = True | |
| if not files: | |
| # Fallback for single file input | |
| if 'file' in request.files: | |
| files = [request.files['file']] | |
| is_batch = False | |
| else: | |
| return jsonify({"error": "No files provided"}), 400 | |
| uploaded_filenames = [] | |
| for file in files: | |
| if file.filename == '': | |
| continue | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| uploaded_filenames.append(filename) | |
| print(f"Saved: {filepath}") | |
| if not uploaded_filenames: | |
| return jsonify({"error": "No valid files uploaded"}), 400 | |
| response = { | |
| "message": "Files uploaded successfully", | |
| "filenames": uploaded_filenames | |
| } | |
| # Ensure backward compatibility and convenience | |
| if len(uploaded_filenames) == 1: | |
| response['filename'] = uploaded_filenames[0] | |
| return jsonify(response) | |
| # Autostart Logic | |
| if request.form.get('autostart') == 'true': | |
| stream = get_user_stream() | |
| if stream: | |
| print(f"Autostarting upload: {uploaded_filenames}") | |
| # Logic similar to camera_control playlist | |
| playlist = [os.path.join(app.config['UPLOAD_FOLDER'], f) for f in uploaded_filenames] | |
| if playlist: | |
| first = playlist.pop(0) | |
| stream.start_source(first, playlist=playlist) | |
| response['status'] = 'started' | |
| response['source'] = 'playlist' if len(uploaded_filenames) > 1 else 'video' | |
| return jsonify(response) | |
| def video_feed(): | |
| stream = get_user_stream() | |
| if stream is None: | |
| return "Server Busy (Max Users reached)", 503 | |
| return Response(generate(stream), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| def process_frame_api(): # Renamed to avoid collision with stream.process_frame | |
| stream = get_user_stream() | |
| if stream is None: | |
| return jsonify({'error': 'Server Busy', 'busy': True}), 503 | |
| # Session-specific client FPS tracking | |
| if 'client_fps_start' not in session: | |
| session['client_fps_start'] = time.time() | |
| session['client_frame_count'] = 0 | |
| session['client_fps'] = 0.0 | |
| data = request.json | |
| if 'image' not in data: | |
| return jsonify({"error": "No image data"}), 400 | |
| # Decode base64 image | |
| image_data = data['image'].split(',')[1] | |
| image_bytes = base64.b64decode(image_data) | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if frame is None: | |
| return jsonify({"error": "Invalid image"}), 400 | |
| timestamp_ms = int(time.time() * 1000) | |
| # Use optimized webcam processing path | |
| annotated_frame, pred_data = stream.process_webcam_frame(frame) | |
| # Encode result | |
| _, buffer = cv2.imencode('.jpg', annotated_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50]) | |
| jpg_as_text = base64.b64encode(buffer).decode('utf-8') | |
| session['client_frame_count'] += 1 | |
| now = time.time() | |
| if now - session['client_fps_start'] > 1.0: | |
| session['client_fps'] = session['client_frame_count'] / (now - session['client_fps_start']) | |
| session['client_frame_count'] = 0 | |
| session['client_fps_start'] = now | |
| return jsonify({ | |
| "image": f"data:image/jpeg;base64,{jpg_as_text}", | |
| "gesture": pred_data['gesture'], | |
| "confidence": pred_data['confidence'], | |
| "status": pred_data['status'], | |
| "hands_detected": pred_data['hands_detected'], | |
| "pose_detected": pred_data['pose_detected'], | |
| "fps": round(session['client_fps'], 1) | |
| }) | |
| # Ensure system initializes | |
| if __name__ == '__main__': | |
| app.run(debug=False, port=8181, use_reloader=False) | |