BIM-MSL / app.py
Antigravity Bot
Ultra-Optimization: 480px Resize + 15 FPS Throttle for low-bandwidth lag fix
563bf2e
import os
import base64
from werkzeug.utils import secure_filename
from flask import Flask, render_template, Response, jsonify, request, session
import cv2
import threading
import time
import atexit
import uuid
import hashlib
from video_processor import GestureRecognizer, GESTURE_NAMES, GESTURE_TRANSLATIONS
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max limit
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
ALLOWED_EXTENSIONS = {'mp4', 'avi', 'mov', 'webm'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
import numpy as np
# Global session management
user_sessions = {}
sessions_lock = threading.Lock()
MAX_SESSIONS = 50
SESSION_TIMEOUT = 300 # More generous: 5 minutes inactivity
def get_session_id():
# 1. Try Flask Session (Cookie-based)
if 'user_id' in session:
return session['user_id']
# 2. Fallback: Browser Fingerprint (IP + User-Agent)
# Get first IP in case of proxy chain
ip_raw = request.headers.get('X-Forwarded-For', request.remote_addr)
ip = ip_raw.split(',')[0].strip() if ip_raw and ',' in ip_raw else ip_raw
ua = request.headers.get('User-Agent', '')
# Use MD5 for a compact ID
fingerprint = hashlib.md5(f"{ip}_{ua}".encode()).hexdigest()
sid = f"fp_{fingerprint}"
# Store in session for next time
session['user_id'] = sid
return sid
def is_probe_request():
"""Detect bots or probes that shouldn't waste processing sessions"""
ua = request.headers.get('User-Agent', '').lower()
bot_keywords = ['health', 'huggingface', 'probe', 'bot', 'crawler', 'spider', 'scraping']
if any(k in ua for k in bot_keywords):
return True
return False
def get_user_stream():
if is_probe_request():
return None
sid = get_session_id()
with sessions_lock:
if sid not in user_sessions:
if len(user_sessions) >= MAX_SESSIONS:
print(f"[WARNING] Max sessions reached ({MAX_SESSIONS}). Rejecting {sid}")
return None
print(f"[SESSION] New: {sid} (UA: {request.headers.get('User-Agent')[:50]}...)")
user_sessions[sid] = {
'stream': CameraStream(),
'last_activity': time.time()
}
else:
user_sessions[sid]['last_activity'] = time.time()
return user_sessions[sid]['stream']
def cleanup_sessions():
while True:
time.sleep(60)
now = time.time()
to_delete = []
# 1. Identify stale sessions under lock
with sessions_lock:
for sid, data in user_sessions.items():
if now - data['last_activity'] > SESSION_TIMEOUT:
# Store sid AND stream to stop it later outside the lock
to_delete.append((sid, data['stream']))
# Remove from registry immediately
for sid, _ in to_delete:
del user_sessions[sid]
# 2. Stop streams OUTSIDE the lock to prevent server hangs
for sid, stream in to_delete:
print(f"[SESSION] Expired/Cleaning: {sid}")
stream.stop()
threading.Thread(target=cleanup_sessions, daemon=True).start()
class CameraStream:
def __init__(self):
self.source = 0 # 0 for webcam, string for file path
self.queue = [] # Playlist queue
self.video = None
self.recognizer = GestureRecognizer()
self.running = False
self.lock = threading.Lock()
self.last_prediction_update = 0
self.source_type = 'server' # Track source type: 'server', 'client', or 'video'
self.alive = True # Thread control flag
# Movement tracking state
self.prev_landmarks = None
self.movement_mag = 0
self.active_gesture = False
self.quiet_frames = 0
self.MOVE_THRESHOLD = 0.008
self.STOP_FRAMES = 10
self.gesture_frames = [] # Dynamic buffer for "One Gesture" capture
# FPS tracking
self.fps = 0
self.frame_count = 0
self.fps_start_time = time.time()
# State variables for user isolation
self.output_frame = None
self.latest_prediction = {'gesture': 'READY', 'confidence': 0, 'status': 'DETECTED GESTURE', 'fps': 0}
# Timed Cycle Mode (1.8s Capture -> 3.0s Cooldown)
self.cycle_state = 'CAPTURING'
self.cycle_start_time = time.time()
self.CAPTURE_DURATION = 1.8
self.COOLDOWN_DURATION = 3.0
# Start background thread
self.thread = threading.Thread(target=self.process_frame, args=())
self.thread.daemon = True
self.thread.start()
def start_source(self, source=0, playlist=None):
with self.lock:
if self.video is not None:
self.video.release()
self.queue = playlist if playlist else []
# CRITICAL: Clear prediction buffers when switching sources
self.frame_buffer = []
self.gloss_predictions = []
self.source_type = 'server' if source == 0 else 'video'
self.recognizer.reset_tracking() # Reset for new stream
# Setup cycle parameters for webcam
self.latest_prediction['status'] = 'STARTING...'
self.cycle_state = 'CAPTURING'
self.cycle_start_time = time.time()
self.gesture_frames = []
# Reset UI results
self.latest_prediction['gesture'] = "READY"
self.latest_prediction['confidence'] = 0.0
# Re-initialize recognizer internal state
self.recognizer.reset_tracking()
print(f"Buffers and Cycle cleared for new source (type: {self.source_type})")
# ASYNC START: Just set the source and let the background thread open it
self.source = source
self.running = True
# Pre-populate queue if provided (minus the first one which is self.source)
# The logic in camera_control passes the first video as 'source' and the rest in 'playlist'
# But here we should trust the args
if playlist:
# If source is already the first item, we shouldn't duplicate?
# camera_control logic: first_video = playlist.pop(0), stream.start_source(first_video, playlist=playlist)
# So playlist arg here contains the REMAINING videos. Correct.
pass
print(f"Async source scheduled: {self.source}")
def stop_source(self):
"""Pause processing and release video capture without killing the thread"""
with self.lock:
self.running = False
self.queue = []
self.frame_buffer = []
self.gloss_predictions = []
self.latest_prediction['status'] = 'DETECTED GESTURE'
if self.video is not None:
self.video.release()
self.video = None
print("Source stopped (Thread kept alive for session)")
def stop(self):
"""Kill the thread - called only during session cleanup"""
self.alive = False
self.stop_source()
if self.thread.is_alive():
self.thread.join(timeout=1.0)
def _perform_prediction(self):
sequence = None
# WEBCAM: Use dynamic gesture_frames
if self.source == 0 and self.gesture_frames:
# Resample dynamic buffer to 30 frames
dynamic_seq = np.array(self.gesture_frames, dtype=np.float32)
if len(dynamic_seq) > 5: # Minimum frames to consider a gesture
resampled = cv2.resize(dynamic_seq, (258, 30), interpolation=cv2.INTER_LINEAR)
sequence = np.expand_dims(resampled, axis=0)
else:
print("Gesture too short, skipped.")
return
# VIDEO: Use sliding window buffer or handle short clips
elif len(self.frame_buffer) >= 5: # Minimum 5 frames for any prediction
seq_list = list(self.frame_buffer)
if len(seq_list) >= 30:
sequence = np.array(seq_list[-30:], dtype=np.float32)
else:
# Pad short video clip
sequence = np.array(seq_list, dtype=np.float32)
pad = np.zeros((30 - len(sequence), 258), dtype=np.float32)
sequence = np.vstack([pad, sequence])
sequence = np.expand_dims(sequence, axis=0) # (1, 30, 258)
if sequence is None:
return
try:
pred_result = self.recognizer.predict_from_sequence(sequence)
gesture_name = pred_result['gesture_name']
confidence = pred_result['confidence']
probs = pred_result['probabilities']
top_3 = []
if probs is not None:
top_indices = probs.argsort()[-3:][::-1]
top_3 = [{"name": GESTURE_NAMES.get(i, f"G{i}"), "prob": float(probs[i])} for i in top_indices]
if gesture_name is None or confidence < 0.6:
gesture_name = "Unknown"
# Update prediction based on source type
if self.source == 0:
# Webcam: immediate or triggered update
self.latest_prediction.update({
"gesture": gesture_name,
"confidence": confidence,
"top_3": top_3,
"hands_detected": pred_result.get('hands_detected', False),
"pose_detected": pred_result.get('pose_detected', False)
})
print(f"Webcam Prediction: {gesture_name} ({confidence:.2%})")
else:
# Video: accumulate and show best
self.latest_prediction['status'] = 'PROCESSING VIDEO'
if gesture_name != "Unknown":
self.gloss_predictions.append({"gesture": gesture_name, "confidence": confidence, "top_3": top_3})
if self.gloss_predictions:
best_prediction = max(self.gloss_predictions, key=lambda x: x['confidence'])
self.latest_prediction.update({
"gesture": best_prediction['gesture'],
"confidence": best_prediction['confidence'],
"top_3": best_prediction.get('top_3', [])
})
# Always ensure FPS is present
self.latest_prediction['fps'] = round(self.fps, 1)
except Exception as e:
print(f"Error in _perform_prediction: {e}")
def process_webcam_frame(self, frame):
"""State machine for webcam timed cycle, optimized for cloud CPU"""
now = time.time()
timestamp_ms = int(now * 1000)
# 1. Higher resolution for L40S GPU to improve precision
# OPTIMIZATION: Reduce to 480x360 (VGA) for bandwidth saving
high_res_frame = cv2.resize(frame, (480, 360))
# 2. Extract features (always needed for the pulse/history if capturing)
result = self.recognizer.predict(high_res_frame, timestamp_ms=timestamp_ms)
hands_detected = result.get('hand_result') is not None and result['hand_result'].hand_landmarks and len(result['hand_result'].hand_landmarks) > 0 if result.get('hand_result') else False
# Always update attention indicators
self.latest_prediction['hands_detected'] = hands_detected
self.latest_prediction['pose_detected'] = result.get('pose_result') is not None and result['pose_result'].pose_landmarks and len(result['pose_result'].pose_landmarks) > 0
# 3. Handle Timed Cycle State Machine
elapsed = now - self.cycle_start_time
if self.cycle_state == 'CAPTURING':
if hands_detected:
# Add extracted features to buffer
landmarks = self.recognizer.extract_features(high_res_frame, timestamp_ms=timestamp_ms)
self.gesture_frames.append(landmarks)
remaining = max(0, self.CAPTURE_DURATION - elapsed)
self.latest_prediction['status'] = f"RECORDING ({remaining:.1f}s)"
if elapsed >= self.CAPTURE_DURATION:
if len(self.gesture_frames) > 5:
# Final prediction from the recording window
sequence = np.array(self.gesture_frames[-30:]) if len(self.gesture_frames) >= 30 else np.array(self.gesture_frames)
# Pad if needed
if len(sequence) < 30:
pad = np.zeros((30 - len(sequence), 258))
sequence = np.vstack([pad, sequence])
sequence = np.expand_dims(sequence, axis=0)
pred_res = self.recognizer.predict_from_sequence(sequence)
self.latest_prediction.update({
"gesture": pred_res['gesture_name'],
"confidence": float(pred_res['confidence']),
"top_3": [] # Could populate if needed
})
else:
self.latest_prediction['gesture'] = "NO HANDS"
self.latest_prediction['confidence'] = 0.0
self.cycle_state = 'COOLDOWN'
self.cycle_start_time = now
self.gesture_frames = []
else:
# COOLDOWN
remaining = max(0, self.COOLDOWN_DURATION - elapsed)
self.latest_prediction['status'] = f"NEXT IN {remaining:.1f}s"
if elapsed >= self.COOLDOWN_DURATION:
self.cycle_state = 'CAPTURING'
self.cycle_start_time = now
self.latest_prediction['status'] = "STARTING..."
# 4. Generate annotated frame (Draw on original frame size)
annotated = self.recognizer.draw_landmarks(frame, result['pose_result'], result['hand_result'])
return annotated, self.latest_prediction
def process_frame(self):
while self.alive:
# FPS Calculation
self.frame_count += 1
loop_start = time.time()
if loop_start - self.fps_start_time > 1.0:
self.fps = self.frame_count / (loop_start - self.fps_start_time)
self.frame_count = 0
self.fps_start_time = loop_start
self.latest_prediction['fps'] = round(self.fps, 1)
should_advance = False
# 0. Async Video Opening Logic (Non-blocking for HTTP threads)
if self.running and self.video is None:
# Try to open the scheduled source
print(f"Background worker opening source: {self.source}")
self.video = cv2.VideoCapture(self.source)
if not self.video.isOpened():
print(f"Failed to open source in background: {self.source}")
# Trigger advance to next in queue
if self.source != 0: # Don't advance if webcam fails (retry?)
should_advance = True
else:
print(f"Background worker successfully opened: {self.source}")
if self.running and self.video is not None and self.video.isOpened():
success, frame = self.video.read()
if success:
# Mirror frame only if webcam
if self.source == 0:
frame = cv2.flip(frame, 1)
# OPTIMIZATION: Resize input immediately to reduce pipeline load
# Target 480px width (VGA)
if frame.shape[1] > 480:
scale = 480 / frame.shape[1]
h = int(frame.shape[0] * scale)
frame = cv2.resize(frame, (480, h), interpolation=cv2.INTER_AREA)
# Extract landmarks from current frame with timestamp
timestamp_ms = int(time.time() * 1000)
landmarks = self.recognizer.extract_features(frame, timestamp_ms=timestamp_ms)
self.frame_buffer.append(landmarks)
if len(self.frame_buffer) > 60: # Limit buffer size
self.frame_buffer = self.frame_buffer[-60:]
# Process frame for visualization
try:
# Get landmarks for drawing
_, pose_result, hand_result = self.recognizer.extract_landmarks(frame, timestamp_ms=timestamp_ms)
# Draw landmarks
annotated_frame = self.recognizer.draw_landmarks(frame, pose_result, hand_result)
# Update detection flags for HUD sync during video playback
hands_detected = hand_result is not None and hand_result.hand_landmarks and len(hand_result.hand_landmarks) > 0 if hand_result else False
pose_detected = pose_result is not None and pose_result.pose_landmarks and len(pose_result.pose_landmarks) > 0
self.latest_prediction.update({
"hands_detected": hands_detected,
"pose_detected": pose_detected
})
# Display frame is already resized
with self.lock:
self.output_frame = annotated_frame.copy()
# CONTINUOUS PREDICTION FOR VIDEO FILES
# OPTIMIZATION: Predict only every 3rd frame (10 FPS inference) to save CPU
if self.source != 0 and len(self.frame_buffer) >= 30:
# Use a persistent counter for stride
if not hasattr(self, '_pred_stride'): self._pred_stride = 0
self._pred_stride += 1
if self._pred_stride % 3 == 0:
self._perform_prediction()
# Slide window for videos
self.frame_buffer = self.frame_buffer[10:]
except Exception as e:
print(f"Error processing frame: {e}")
if self.source != 0:
# Adaptive Frame Pacing for ~30 FPS
process_duration = time.time() - loop_start
delay = max(0, 0.033 - process_duration)
time.sleep(delay)
else:
# Video stream finished
if isinstance(self.source, str):
print(f"Physical end of video reached: {self.source}")
should_advance = True
else:
time.sleep(0.1)
else:
# Video not opened or not running
if self.running and isinstance(self.source, str):
print(f"Video stream not opened or stopped: {self.source}")
should_advance = True
else:
time.sleep(0.1)
# Queue advancement logic
if should_advance:
print(f"Advancing from {self.source}...")
# Show final prediction for the finished video
if self.gloss_predictions:
best_prediction = max(self.gloss_predictions, key=lambda x: x['confidence'])
self.latest_prediction.update({
"gesture": best_prediction['gesture'],
"confidence": best_prediction['confidence'],
"top_3": best_prediction.get('top_3', [])
})
print(f"FINAL GLOSS for {self.source}: {best_prediction['gesture']} ({best_prediction['confidence']:.2%})")
# Clear buffers for next video
self.frame_buffer = []
self.gloss_predictions = []
self.prev_landmarks = None
self.active_gesture = False
# Release current video
if self.video:
self.video.release()
# Check for next in queue
if self.queue:
self.source = self.queue.pop(0)
print(f"Opening next video in queue: {self.source}")
self.video = cv2.VideoCapture(self.source)
if not self.video.isOpened():
print(f"Error: Could not open {self.source}")
# If opening fails, keep should_advance=True to try next in queue?
# Actually, we should loop until we find a valid one or empty queue
continue
else:
print("Playlist finished - no more videos.")
self.running = False
self.source = None
self.video = None
# Idle sleep if not running to avoid high CPU
if not self.running:
time.sleep(0.1)
def stop(self):
self.stop_source()
# Cleanup helper kept for reference if needed
def stop():
with sessions_lock:
for sid, data in user_sessions.items():
data['stream'].stop()
def generate(stream):
# Load placeholder image once
placeholder_path = os.path.join('static', 'placeholder.png')
placeholder_frame = cv2.imread(placeholder_path)
while stream.alive:
with stream.lock:
if stream.output_frame is None:
# Serve placeholder when no active source
if placeholder_frame is not None:
(flag, encodedImage) = cv2.imencode(".jpg", placeholder_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if flag:
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
time.sleep(0.1)
continue
(flag, encodedImage) = cv2.imencode(".jpg", stream.output_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
if not flag:
continue
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
time.sleep(0.066) # Limit stream to 15 FPS to save bandwidth
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/status')
def status():
stream = get_user_stream()
if stream is None:
return jsonify({'error': 'Server Busy (Max Users Reached)', 'busy': True}), 503
return jsonify(stream.latest_prediction)
@app.route('/api/gestures')
def get_gestures():
# Convert integer keys to string keys for JSON compatibility if needed,
# but GESTURE_NAMES has int keys. jsonify handles them but usually converts keys to strings.
return jsonify(GESTURE_NAMES)
@app.route('/api/camera/control', methods=['POST'])
def camera_control():
stream = get_user_stream()
if stream is None:
return jsonify({'error': 'Server Busy (Max Users Reached)', 'busy': True}), 503
print("=" * 50)
print("CAMERA CONTROL ENDPOINT HIT")
data = request.json
print(f"Camera Control Payload: {data}") # DEBUG LOG
action = data.get('action')
source_type = data.get('source', 'webcam')
filename = data.get('filename')
print(f"Action: {action}, Source: {source_type}, Filename: {filename}")
if action == 'stop':
# Don't strictly stop, just clear queue and stop current source
print("Stopping camera stream...")
stream.stop_source()
return jsonify({"status": "stopped"})
elif action == 'start':
if source_type == 'webcam':
print("Starting webcam...")
stream.start_source(0)
elif source_type == 'video' and filename:
filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename))
print(f"Starting single video: {filepath}")
if os.path.exists(filepath):
stream.start_source(filepath)
else:
print(f"File not found: {filepath}")
return jsonify({"error": "File not found"}), 404
elif source_type == 'playlist':
filenames = data.get('filenames', [])
print(f"Starting playlist with {len(filenames)} files: {filenames}")
playlist = []
for fname in filenames:
fpath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(fname))
print(f"Checking file: {fpath}, exists: {os.path.exists(fpath)}")
if os.path.exists(fpath):
playlist.append(fpath)
print(f"Valid playlist files: {len(playlist)}")
if playlist:
# Start first, queue rest
first_video = playlist.pop(0)
print(f"Starting first video: {first_video}, queue: {playlist}")
stream.start_source(first_video, playlist=playlist)
else:
print("No valid files in playlist!")
return jsonify({"error": "No valid files in playlist"}), 400
print(f"Returning success: started {source_type}")
return jsonify({"status": "started", "source": source_type})
print("Invalid action!")
return jsonify({"error": "Invalid action"}), 400
@app.route('/api/upload', methods=['POST'])
def upload_file():
# Handle multiple files
files = request.files.getlist('files[]')
is_batch = True
if not files:
# Fallback for single file input
if 'file' in request.files:
files = [request.files['file']]
is_batch = False
else:
return jsonify({"error": "No files provided"}), 400
uploaded_filenames = []
for file in files:
if file.filename == '':
continue
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
uploaded_filenames.append(filename)
print(f"Saved: {filepath}")
if not uploaded_filenames:
return jsonify({"error": "No valid files uploaded"}), 400
response = {
"message": "Files uploaded successfully",
"filenames": uploaded_filenames
}
# Ensure backward compatibility and convenience
if len(uploaded_filenames) == 1:
response['filename'] = uploaded_filenames[0]
return jsonify(response)
# Autostart Logic
if request.form.get('autostart') == 'true':
stream = get_user_stream()
if stream:
print(f"Autostarting upload: {uploaded_filenames}")
# Logic similar to camera_control playlist
playlist = [os.path.join(app.config['UPLOAD_FOLDER'], f) for f in uploaded_filenames]
if playlist:
first = playlist.pop(0)
stream.start_source(first, playlist=playlist)
response['status'] = 'started'
response['source'] = 'playlist' if len(uploaded_filenames) > 1 else 'video'
return jsonify(response)
@app.route('/video_feed')
def video_feed():
stream = get_user_stream()
if stream is None:
return "Server Busy (Max Users reached)", 503
return Response(generate(stream), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/api/process_frame', methods=['POST'])
def process_frame_api(): # Renamed to avoid collision with stream.process_frame
stream = get_user_stream()
if stream is None:
return jsonify({'error': 'Server Busy', 'busy': True}), 503
# Session-specific client FPS tracking
if 'client_fps_start' not in session:
session['client_fps_start'] = time.time()
session['client_frame_count'] = 0
session['client_fps'] = 0.0
data = request.json
if 'image' not in data:
return jsonify({"error": "No image data"}), 400
# Decode base64 image
image_data = data['image'].split(',')[1]
image_bytes = base64.b64decode(image_data)
nparr = np.frombuffer(image_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return jsonify({"error": "Invalid image"}), 400
timestamp_ms = int(time.time() * 1000)
# Use optimized webcam processing path
annotated_frame, pred_data = stream.process_webcam_frame(frame)
# Encode result
_, buffer = cv2.imencode('.jpg', annotated_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
session['client_frame_count'] += 1
now = time.time()
if now - session['client_fps_start'] > 1.0:
session['client_fps'] = session['client_frame_count'] / (now - session['client_fps_start'])
session['client_frame_count'] = 0
session['client_fps_start'] = now
return jsonify({
"image": f"data:image/jpeg;base64,{jpg_as_text}",
"gesture": pred_data['gesture'],
"confidence": pred_data['confidence'],
"status": pred_data['status'],
"hands_detected": pred_data['hands_detected'],
"pose_detected": pred_data['pose_detected'],
"fps": round(session['client_fps'], 1)
})
# Ensure system initializes
if __name__ == '__main__':
app.run(debug=False, port=8181, use_reloader=False)