| """ |
| NETRA - Video Surveillance and Analysis Web Application |
| Main Flask Application |
| """ |
|
|
| from flask import Flask, render_template, request, jsonify, session, redirect, url_for, Response, send_from_directory |
| from flask_sqlalchemy import SQLAlchemy |
| from werkzeug.security import generate_password_hash, check_password_hash |
| from werkzeug.utils import secure_filename |
| from werkzeug.middleware.proxy_fix import ProxyFix |
| import cv2 |
| import numpy as np |
| import os |
| import sys |
| from datetime import datetime, date |
| import base64 |
| from pathlib import Path |
| import threading |
| import queue |
| import mimetypes |
| import json |
| import uuid |
| import shutil |
| import tempfile |
|
|
| |
| |
| |
|
|
| def detect_project_root(): |
| """ |
| Dynamically detect PROJECT_ROOT by looking for config/ and src/ directories |
| Works on any device, any installation path |
| """ |
| current = Path(__file__).parent.parent |
| if (current / 'config').exists() and (current / 'src').exists(): |
| return current |
| |
| |
| current = Path(__file__).parent |
| while current != current.parent: |
| if (current / 'config').exists() and (current / 'src').exists(): |
| return current |
| current = current.parent |
| |
| |
| return Path.cwd() |
|
|
| PROJECT_ROOT = detect_project_root() |
|
|
| |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| |
| WEBAPP_FOLDER = PROJECT_ROOT / 'webapp' |
|
|
| print(f"\n{'='*60}") |
| print(f"π PROJECT CONFIGURATION (Portable Setup)") |
| print(f"{'='*60}") |
| print(f"π PROJECT_ROOT: {PROJECT_ROOT}") |
| print(f"π WEBAPP_FOLDER: {WEBAPP_FOLDER}") |
| print(f"{'='*60}\n") |
|
|
| |
| from src import ( |
| ViolenceDetector, |
| YOLODetector, |
| WeaponPersonDetector, |
| PoseDetection, |
| AnomalyDetector, |
| VideoCapture, |
| ) |
|
|
| |
| from config import ( |
| MODEL_PATHS, |
| get_model_path, |
| DETECTION_THRESHOLDS, |
| PROCESSING_PARAMS, |
| SECRET_KEY, |
| DATABASE_URI, |
| MAX_CONTENT_LENGTH, |
| UPLOAD_FOLDER, |
| PROCESSED_FOLDER, |
| ) |
|
|
| |
| from src.utils.model_downloader import setup_all_models |
|
|
| |
| |
| setup_all_models() |
|
|
| |
| def get_video_codec(): |
| """ |
| Select the best video codec for cross-platform compatibility. |
| XVID is widely supported on Windows, Mac, and Linux. |
| """ |
| try: |
| |
| codec = cv2.VideoWriter_fourcc(*'XVID') |
| |
| test_path = str(Path(tempfile.gettempdir()) / 'codec_test.avi') |
| test_writer = cv2.VideoWriter(test_path, codec, 1.0, (640, 480)) |
| if test_writer.isOpened(): |
| test_writer.release() |
| if os.path.exists(test_path): |
| os.remove(test_path) |
| return codec, '.avi' |
| except Exception: |
| pass |
| |
| try: |
| |
| codec = cv2.VideoWriter_fourcc(*'MJPG') |
| test_path = str(Path(tempfile.gettempdir()) / 'codec_test.avi') |
| test_writer = cv2.VideoWriter(test_path, codec, 1.0, (640, 480)) |
| if test_writer.isOpened(): |
| test_writer.release() |
| if os.path.exists(test_path): |
| os.remove(test_path) |
| return codec, '.avi' |
| except Exception: |
| pass |
| |
| |
| return cv2.VideoWriter_fourcc(*'XVID'), '.avi' |
|
|
| app = Flask(__name__, template_folder=str(WEBAPP_FOLDER / 'templates'), |
| static_folder=str(WEBAPP_FOLDER / 'static')) |
| app.config['SECRET_KEY'] = SECRET_KEY |
| app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URI |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
| app.config['UPLOAD_FOLDER'] = str(UPLOAD_FOLDER) |
| app.config['PROCESSED_FOLDER'] = str(PROCESSED_FOLDER) |
| app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH |
|
|
| |
| |
| app.config['SESSION_COOKIE_SECURE'] = True |
| app.config['SESSION_COOKIE_HTTPONLY'] = True |
| app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' |
| app.config['PERMANENT_SESSION_LIFETIME'] = 86400 |
| app.config['SESSION_REFRESH_EACH_REQUEST'] = True |
|
|
| |
| |
| |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1) |
|
|
| |
| |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
| os.makedirs(app.config['PROCESSED_FOLDER'], exist_ok=True) |
|
|
| db = SQLAlchemy(app) |
|
|
| |
| class User(db.Model): |
| """User model for authentication""" |
| id = db.Column(db.Integer, primary_key=True) |
| username = db.Column(db.String(80), unique=True, nullable=False) |
| email = db.Column(db.String(120), unique=True, nullable=False) |
| password_hash = db.Column(db.String(200), nullable=False) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| |
| def set_password(self, password): |
| self.password_hash = generate_password_hash(password) |
| |
| def check_password(self, password): |
| return check_password_hash(self.password_hash, password) |
|
|
|
|
| class AnalysisHistory(db.Model): |
| """Store analysis history""" |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| analysis_type = db.Column(db.String(50), nullable=False) |
| filename = db.Column(db.String(200)) |
| detections = db.Column(db.Text) |
| alert_count = db.Column(db.Integer, default=0) |
| detection_count = db.Column(db.Integer, default=0) |
| duration = db.Column(db.Integer, default=0) |
| models_used = db.Column(db.Text) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| ended_at = db.Column(db.DateTime) |
| |
| original_filename = db.Column(db.String(200)) |
| processed_video_url = db.Column(db.String(500)) |
| preview_image_url = db.Column(db.String(500)) |
| emergency_frames = db.Column(db.Text) |
| total_frames = db.Column(db.Integer, default=0) |
| processed_frames = db.Column(db.Integer, default=0) |
| frame_summaries = db.Column(db.Text) |
|
|
|
|
| class PersonDetection(db.Model): |
| """Store person detection records for live camera monitoring""" |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| person_count = db.Column(db.Integer, default=0) |
| is_present = db.Column(db.Boolean, default=False) |
| confidence = db.Column(db.Float, default=0.0) |
| detection_details = db.Column(db.Text) |
| detected_at = db.Column(db.DateTime, default=datetime.utcnow) |
| session_date = db.Column(db.Date, default=datetime.utcnow) |
| |
| def __repr__(self): |
| return f'<PersonDetection {self.id}: {self.person_count} people on {self.detected_at}>' |
|
|
|
|
| class DetectionHistory(db.Model): |
| """Store detection screenshots for weapons, unusual activity, and risks""" |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| detection_type = db.Column(db.String(50), nullable=False) |
| alert_level = db.Column(db.String(20), default='MEDIUM') |
| confidence = db.Column(db.Float, default=0.0) |
| image_filename = db.Column(db.String(200), nullable=False) |
| detection_details = db.Column(db.Text) |
| detected_at = db.Column(db.DateTime, default=datetime.utcnow) |
| session_date = db.Column(db.Date, default=datetime.utcnow) |
| |
| def __repr__(self): |
| return f'<DetectionHistory {self.id}: {self.detection_type} on {self.detected_at}>' |
|
|
|
|
| class LiveSession(db.Model): |
| """Store live monitoring session recordings and metadata""" |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| session_name = db.Column(db.String(200)) |
| video_filename = db.Column(db.String(200), nullable=False) |
| preview_image = db.Column(db.String(200)) |
| detection_count = db.Column(db.Integer, default=0) |
| alert_count = db.Column(db.Integer, default=0) |
| threat_count = db.Column(db.Integer, default=0) |
| duration = db.Column(db.Integer, default=0) |
| total_frames = db.Column(db.Integer, default=0) |
| models_used = db.Column(db.Text) |
| detections_summary = db.Column(db.Text) |
| alerts_summary = db.Column(db.Text) |
| emergency_frames = db.Column(db.Text) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| ended_at = db.Column(db.DateTime) |
| is_critical = db.Column(db.Boolean, default=False) |
| notes = db.Column(db.Text) |
| |
| def __repr__(self): |
| return f'<LiveSession {self.id}: {self.session_name} on {self.created_at}>' |
|
|
|
|
| |
| class ModelManager: |
| """Manages all AI models for detection""" |
| |
| def __init__(self): |
| self.models = {} |
| self.load_models() |
| |
| def load_models(self): |
| """Load all available models from ai_models/ directory""" |
| ai_models_dir = PROJECT_ROOT / 'ai_models' |
| |
| print(f"\nπ Looking for models in: {ai_models_dir}") |
| print(f"π Directory exists: {ai_models_dir.exists()}") |
| |
| |
| if ai_models_dir.exists(): |
| print(f"π Contents of ai_models:") |
| for item in ai_models_dir.rglob("*"): |
| if item.is_file(): |
| print(f" - {item.relative_to(ai_models_dir)}") |
| |
| def find_model_file(patterns): |
| """Find model file matching any of the patterns""" |
| for pattern in patterns: |
| |
| path = ai_models_dir / pattern |
| if path.exists(): |
| return path |
| |
| |
| path = ai_models_dir / 'ai_models' / pattern |
| if path.exists(): |
| return path |
| |
| return None |
| |
| try: |
| |
| violence_path = find_model_file(['activity_recognition/violence_model.h5']) |
| if violence_path: |
| self.models['violence'] = ViolenceDetector() |
| print(f"β Violence Detection loaded from: {violence_path.relative_to(PROJECT_ROOT)}") |
| else: |
| print(f"β Violence model not found") |
| except Exception as e: |
| print(f"β Error loading violence model: {e}") |
| |
| try: |
| |
| yolo_path = find_model_file(['object_detection/yolov8n.pt']) |
| if yolo_path: |
| self.models['yolo'] = YOLODetector(model_path=str(yolo_path)) |
| print(f"β YOLO Object Detection loaded from: {yolo_path.relative_to(PROJECT_ROOT)}") |
| else: |
| print(f"β YOLO model not found") |
| except Exception as e: |
| print(f"β Error loading YOLO model: {e}") |
| |
| try: |
| |
| gun_path = find_model_file(['weapon_detection/best.pt']) |
| person_path = find_model_file(['object_detection/yolov8n.pt']) |
|
|
| if gun_path and person_path: |
| self.models['weapon'] = WeaponPersonDetector( |
| gun_model_path=str(gun_path), |
| person_model_path=str(person_path) |
| ) |
| print(f"β Weapon Detection loaded") |
| else: |
| print(f"β Weapon detection models not found") |
| except Exception as e: |
| print(f"β Error loading weapon model: {e}") |
|
|
| try: |
| |
| pose_path = find_model_file(['pose_detection/yolo11n-pose.pt']) |
| if pose_path: |
| self.models['pose'] = PoseDetection(model_path=str(pose_path)) |
| print(f"β Pose Detection loaded from: {pose_path.relative_to(PROJECT_ROOT)}") |
| else: |
| print(f"β Pose model not found") |
| except Exception as e: |
| print(f"β Error loading pose model: {e}") |
| |
| try: |
| |
| anomaly_path = find_model_file(['weapon_detection/best.bin']) |
| if anomaly_path: |
| try: |
| self.models['anomaly'] = AnomalyDetector(model_path=str(anomaly_path)) |
| print(f"β Anomaly Detection loaded") |
| except Exception as init_error: |
| print(f"β Failed to init anomaly detector: {init_error}") |
| else: |
| print(f"β Anomaly model not found") |
| except Exception as e: |
| print(f"β Error with anomaly model: {e}") |
| |
| try: |
| |
| analysis_path = find_model_file([ |
| 'analysis_models/fight_detection_model.h5', |
| 'analysis_models/CustomCNN.h5', |
| 'analysis_models/binarycnn200.h5' |
| ]) |
| if analysis_path: |
| try: |
| from tensorflow import keras |
| self.models['analysis'] = keras.models.load_model(str(analysis_path)) |
| print(f"β Fight/Behavior Detection loaded from: {analysis_path.name}") |
| except Exception as init_error: |
| print(f"β Failed to load analysis model: {init_error}") |
| else: |
| print(f"β Analysis models not found") |
| except Exception as e: |
| print(f"β Error loading analysis model: {e}") |
| |
| try: |
| |
| lstm_path = find_model_file([ |
| 'LSTM Model/MobBiLSTM_model_saved101.keras', |
| 'LSTM_Model/MobBiLSTM_model_saved101.keras' |
| ]) |
| if lstm_path: |
| try: |
| from tensorflow import keras |
| self.models['lstm'] = keras.models.load_model(str(lstm_path), compile=False) |
| print(f"β LSTM Sequence Analysis loaded") |
| except Exception as lstm_error: |
| print(f"β Failed to load LSTM model: {lstm_error}") |
| else: |
| print(f"β LSTM model not found") |
| except Exception as e: |
| print(f"β Error with LSTM model: {e}") |
| |
| print(f"\nβ Total models loaded: {len(self.models)}\n") |
| |
| def add_model(self, model_name, model_instance): |
| """Add a new model dynamically""" |
| self.models[model_name] = model_instance |
| |
| def get_model(self, model_name): |
| """Get a specific model""" |
| return self.models.get(model_name) |
| |
| def list_models(self): |
| """List all available models""" |
| return list(self.models.keys()) |
| |
| def get_model_details(self): |
| """Get details about all available models (loaded and available)""" |
| model_info = { |
| 'violence': {'name': 'Violence Detection', 'description': 'Detects violent activities in video'}, |
| 'yolo': {'name': 'Object Detection', 'description': 'Detects common objects (people, cars, etc)'}, |
| 'weapon': {'name': 'Weapon Detection', 'description': 'Detects guns, knives and other weapons'}, |
| 'pose': {'name': 'Pose Detection', 'description': 'Detects human poses and body movements'}, |
| 'anomaly': {'name': 'Anomaly Detection (best.bin)', 'description': 'Detects unusual/anomalous behavior patterns'}, |
| 'analysis': {'name': 'Advanced Analysis (Fight/Behavior)', 'description': 'Advanced behavior and fight detection analysis'}, |
| 'lstm': {'name': 'LSTM Sequence Analysis', 'description': 'Temporal behavior analysis using bidirectional LSTM'}, |
| } |
| |
| model_details = {} |
| |
| for model_name in self.models.keys(): |
| model_details[model_name] = { |
| 'name': model_info.get(model_name, {}).get('name', model_name), |
| 'description': model_info.get(model_name, {}).get('description', ''), |
| 'enabled': True |
| } |
| |
| return model_details |
|
|
|
|
| |
| model_manager = ModelManager() |
|
|
| |
| class VideoProcessor: |
| """Processes video frames and applies AI models""" |
| |
| def __init__(self): |
| self.active_stream = None |
| self.processing = False |
| self.frame_index = 0 |
| self.alert_hit_threshold = 3 |
| self.unsafe_hit_streak = 0 |
| self.unsafe_alert_latched = False |
| self.selected_models = None |
| |
| self.person_detection_frames = 0 |
| self.weapon_detection_frames = 0 |
| self.person_consecutive_frames = 0 |
| self.weapon_consecutive_frames = 0 |
| self.weapon_alert_triggered = False |
| |
| self.session_detections = [] |
| self.session_alerts = [] |
| |
| self.lstm_frame_buffer = [] |
| self.lstm_buffer_size = 16 |
| self.lstm_prediction_cache = None |
| self.last_pose_summary = None |
|
|
| def set_selected_models(self, model_list): |
| """Set which models to use for processing (None means all models)""" |
| self.selected_models = model_list if model_list else None |
| |
| def is_model_selected(self, model_name): |
| """Check if a model should be used""" |
| if self.selected_models is None: |
| return True |
| return model_name in self.selected_models |
|
|
| def reset_session_state(self): |
| """Reset temporal detector state before a new video/live analysis session.""" |
| self.frame_index = 0 |
| self.unsafe_hit_streak = 0 |
| self.unsafe_alert_latched = False |
| pose_model = model_manager.get_model('pose') |
| if pose_model: |
| try: |
| pose_model.reset_movement_state() |
| except Exception as e: |
| pass |
| |
| anomaly_model = model_manager.get_model('anomaly') |
| if anomaly_model: |
| try: |
| anomaly_model.reset() |
| except Exception as e: |
| pass |
| |
| |
| self.person_detection_frames = 0 |
| self.weapon_detection_frames = 0 |
| self.person_consecutive_frames = 0 |
| self.weapon_consecutive_frames = 0 |
| |
| self.last_pose_summary = None |
| self.weapon_alert_triggered = False |
| |
| self.session_detections = [] |
| self.session_alerts = [] |
| |
| self.lstm_frame_buffer = [] |
| self.lstm_prediction_cache = None |
|
|
| def _update_alert_streak(self, condition_active, current_streak, is_latched): |
| """Trigger one alert only after N consecutive hits, then wait for reset.""" |
| if condition_active: |
| current_streak += 1 |
| else: |
| return 0, False, False |
|
|
| if current_streak >= self.alert_hit_threshold and not is_latched: |
| return current_streak, True, True |
|
|
| return current_streak, is_latched, False |
|
|
| def _short_text(self, text, max_chars=28): |
| """Trim long text so it fits compact overlay cells.""" |
| if not text: |
| return "None" |
| if len(text) <= max_chars: |
| return text |
| return text[:max_chars - 3] + "..." |
|
|
| def _build_overlay_summary(self, results): |
| """Build top panel summary from frame detections and alerts.""" |
| object_classes = [] |
| weapon_classes = [] |
|
|
| for det in results['detections']: |
| det_type = det.get('type') |
| det_class = det.get('class', 'unknown') |
| if det_type == 'object': |
| object_classes.append(det_class) |
| elif det_type == 'weapon': |
| weapon_classes.append(det_class) |
|
|
| unique_objects = sorted(set(object_classes)) |
| unique_weapons = sorted(set(weapon_classes)) |
|
|
| object_text = ', '.join(unique_objects[:3]) if unique_objects else 'None' |
| weapon_text = ', '.join(unique_weapons[:2]) if unique_weapons else 'None' |
|
|
| behavior_text = 'Normal Activity' |
| pose_summary = results.get('pose', {}) |
| pose_risk = pose_summary.get('risk_level') |
| pose_action = pose_summary.get('action') |
| if pose_risk and pose_risk != 'SAFE': |
| behavior_text = f"{pose_risk.replace('_', ' ')}: {pose_action}" |
| elif any(alert.get('type') == 'violence' for alert in results['alerts']): |
| behavior_text = 'Violence Detected' |
|
|
| if results['alerts']: |
| highest_severity = results['alerts'][0].get('severity', 'ALERT') |
| alert_text = f"{len(results['alerts'])} Active ({highest_severity})" |
| else: |
| alert_text = 'No Active Alerts' |
|
|
| return { |
| 'Detected Objects': self._short_text(object_text), |
| 'Weapon Status': self._short_text(weapon_text), |
| 'Alert System': self._short_text(alert_text), |
| 'Behavior Analysis': self._short_text(behavior_text) |
| } |
|
|
| def _draw_top_info_panel(self, frame, summary): |
| """Draw a compact 2x2 information panel at the top-center of the frame.""" |
| frame_h, frame_w = frame.shape[:2] |
|
|
| panel_w = min(max(int(frame_w * 0.62), 360), max(frame_w - 20, 200)) |
| panel_h = min(max(int(frame_h * 0.20), 110), 170) |
|
|
| x1 = (frame_w - panel_w) // 2 |
| y1 = 10 |
| x2 = x1 + panel_w |
| y2 = y1 + panel_h |
|
|
| overlay = frame.copy() |
| cv2.rectangle(overlay, (x1, y1), (x2, y2), (18, 18, 18), -1) |
| cv2.addWeighted(overlay, 0.65, frame, 0.35, 0, frame) |
|
|
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 200, 255), 1) |
|
|
| cell_w = panel_w // 2 |
| cell_h = panel_h // 2 |
|
|
| cv2.line(frame, (x1 + cell_w, y1), (x1 + cell_w, y2), (70, 70, 70), 1) |
| cv2.line(frame, (x1, y1 + cell_h), (x2, y1 + cell_h), (70, 70, 70), 1) |
|
|
| cells = [ |
| ('Detected Objects', summary['Detected Objects'], (80, 220, 80)), |
| ('Weapon Status', summary['Weapon Status'], (70, 200, 255)), |
| ('Alert System', summary['Alert System'], (0, 120, 255)), |
| ('Behavior Analysis', summary['Behavior Analysis'], (160, 180, 255)) |
| ] |
|
|
| for idx, (title, value, color) in enumerate(cells): |
| col = idx % 2 |
| row = idx // 2 |
| tx = x1 + col * cell_w + 10 |
| ty = y1 + row * cell_h + 20 |
|
|
| cv2.putText(frame, title, (tx, ty), cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1) |
| cv2.putText(frame, value, (tx, ty + 22), cv2.FONT_HERSHEY_SIMPLEX, 0.48, (240, 240, 240), 1) |
|
|
| def _draw_person_bounding_boxes(self, frame, detections): |
| """Draw bounding boxes for detected persons with labels""" |
| person_detections = [det for det in detections if det.get('class') == 'person'] |
| |
| for det in person_detections: |
| bbox = det.get('bbox', []) |
| confidence = det.get('confidence', 0.0) |
| |
| if len(bbox) >= 4: |
| x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]) |
| |
| |
| h, w = frame.shape[:2] |
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(w, x2), min(h, y2) |
| |
| |
| color = (0, 255, 0) |
| thickness = 2 |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness) |
| |
| |
| label = f"Person {confidence:.2f}" |
| label_size, baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
| |
| |
| label_top = max(y1 - label_size[1] - 5, 0) |
| cv2.rectangle(frame, (x1, label_top), (x1 + label_size[0], label_top + label_size[1] + 5), color, -1) |
| |
| |
| cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
|
| def _severity_rank(self, severity): |
| ranks = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4} |
| return ranks.get((severity or '').upper(), 0) |
|
|
| def _save_detection_screenshot(self, frame, detection_type, alert_level, confidence, details=None): |
| """Save a screenshot of the detection region to database""" |
| try: |
| from flask import session |
|
|
| user_id = session.get('user_id') |
| if not user_id: |
| return None |
| |
| |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| unique_id = str(uuid.uuid4())[:8] |
| filename = f"detection_{detection_type}_{timestamp}_{unique_id}.jpg" |
| |
| |
| upload_dir = UPLOAD_FOLDER / 'detections' |
| upload_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| filepath = upload_dir / filename |
| cv2.imwrite(str(filepath), frame) |
| |
| |
| detection_record = DetectionHistory( |
| user_id=user_id, |
| detection_type=detection_type, |
| alert_level=alert_level, |
| confidence=confidence, |
| image_filename=filename, |
| detection_details=json.dumps(details) if details else None, |
| detected_at=datetime.now(), |
| session_date=date.today() |
| ) |
| db.session.add(detection_record) |
| db.session.commit() |
| |
| return detection_record.id |
| except Exception as e: |
| pass |
| return None |
|
|
| def _overlay_detections(self, frame, detections): |
| """Draw compact detection boxes for key weapon/object detections.""" |
| colors = { |
| 'weapon': (0, 0, 255), |
| 'object': (0, 200, 0), |
| } |
| for det in detections: |
| bbox = det.get('bbox') |
| if not bbox or len(bbox) != 4: |
| continue |
|
|
| x1, y1, x2, y2 = [int(v) for v in bbox] |
| det_type = det.get('type', 'object') |
| label = f"{det.get('class', det_type)} {det.get('confidence', 0):.2f}" |
| color = colors.get(det_type, (255, 180, 0)) |
|
|
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2 if det_type == 'weapon' else 1) |
| (text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1) |
| text_y1 = max(0, y1 - text_h - 8) |
| cv2.rectangle(frame, (x1, text_y1), (x1 + text_w + 8, y1), color, -1) |
| cv2.putText(frame, label, (x1 + 4, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) |
|
|
| def _annotate_pose_banner(self, frame, pose_summary): |
| """Draw pose risk banner below the top info panel when pose is available.""" |
| if not pose_summary: |
| return |
|
|
| risk_level = pose_summary.get('risk_level', 'SAFE') |
| action = pose_summary.get('action', 'other') |
| score = pose_summary.get('risk_score', 0.0) |
|
|
| if risk_level == 'HIGH_RISK': |
| color = (0, 0, 220) |
| elif risk_level == 'LOW_RISK': |
| color = (0, 140, 255) |
| else: |
| color = (0, 128, 0) |
|
|
| text = f"Pose Risk: {risk_level} | Action: {action} | Score: {score:.2f}" |
| top = 188 |
| bottom = min(frame.shape[0] - 1, top + 34) |
| cv2.rectangle(frame, (10, top), (frame.shape[1] - 10, bottom), color, -1) |
| cv2.putText(frame, text, (20, top + 23), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
|
| def _build_frame_summary(self, frame_number, results): |
| """Create a UI-friendly frame-wise summary.""" |
| detections = results['detections'] |
| alerts = results['alerts'] |
| pose_summary = results.get('pose') |
|
|
| highest_alert = max(alerts, key=lambda alert: self._severity_rank(alert.get('severity')), default=None) |
| weapon_names = sorted({det['class'] for det in detections if det.get('type') == 'weapon'}) |
| object_names = sorted({det['class'] for det in detections if det.get('type') == 'object'}) |
|
|
| return { |
| 'frame_number': frame_number, |
| 'weapon_count': len([det for det in detections if det.get('type') == 'weapon']), |
| 'object_count': len([det for det in detections if det.get('type') == 'object']), |
| 'weapon_classes': weapon_names, |
| 'object_classes': object_names[:6], |
| 'pose_risk': pose_summary.get('risk_level', 'UNKNOWN') if pose_summary else 'UNAVAILABLE', |
| 'pose_action': pose_summary.get('action', 'unknown') if pose_summary else 'unknown', |
| 'pose_score': float(pose_summary.get('risk_score', 0.0)) if pose_summary else 0.0, |
| 'alert_state': highest_alert.get('severity', 'SAFE') if highest_alert else 'SAFE', |
| 'alert_message': highest_alert.get('message', 'No active alert') if highest_alert else 'No active alert', |
| 'detections': detections, |
| 'alerts': alerts, |
| } |
|
|
| def process_frame(self, frame): |
| """Process a single frame with selected models""" |
| self.frame_index += 1 |
| results = { |
| 'detections': [], |
| 'alerts': [], |
| 'annotated_frame': frame.copy(), |
| 'pose': None, |
| } |
| frame_result = None |
| |
| |
| if self.is_model_selected('weapon') and 'weapon' in model_manager.models: |
| try: |
| weapon_model = model_manager.get_model('weapon') |
| frame_result = weapon_model.process_frame(frame) |
| |
| |
| for weapon in frame_result.weapons: |
| results['detections'].append({ |
| 'type': 'weapon', |
| 'class': weapon.class_name, |
| 'confidence': float(weapon.confidence), |
| 'bbox': list(weapon.bbox) |
| }) |
| |
| except Exception as e: |
| pass |
|
|
| |
| if 'pose' in model_manager.models: |
| try: |
| pose_model = model_manager.get_model('pose') |
| pose_result = pose_model.predict(frame)[0] |
| movement = pose_model.assess_movement(pose_result) |
| results['pose'] = { |
| 'risk_level': movement.get('risk_level', 'SAFE'), |
| 'action': movement.get('action', 'other'), |
| 'risk_score': float(movement.get('risk_score', 0.0)), |
| 'details': movement.get('details', {}), |
| } |
| except Exception as e: |
| pass |
| |
| |
| if self.is_model_selected('yolo') and 'yolo' in model_manager.models: |
| try: |
| yolo_model = model_manager.get_model('yolo') |
| detections = yolo_model.detect(frame, conf_threshold=DETECTION_THRESHOLDS['yolo']) |
| |
| for det in detections: |
| results['detections'].append({ |
| 'type': 'object', |
| 'class': det.class_name, |
| 'confidence': float(det.confidence), |
| 'bbox': list(det.bbox) |
| }) |
| except Exception as e: |
| pass |
| |
| |
| if self.is_model_selected('violence') and 'violence' in model_manager.models: |
| try: |
| violence_model = model_manager.get_model('violence') |
| violence_result = violence_model.detect_violence(frame, confidence_threshold=DETECTION_THRESHOLDS['violence']) |
| |
| if violence_result.is_violence: |
| results['alerts'].append({ |
| 'type': 'violence', |
| 'severity': violence_result.alert_level, |
| 'confidence': float(violence_result.confidence), |
| 'message': f'Violence detected: {violence_result.class_name}' |
| }) |
| |
| self._save_detection_screenshot( |
| frame, |
| 'violence', |
| violence_result.alert_level, |
| float(violence_result.confidence), |
| {'violence_class': violence_result.class_name} |
| ) |
| except Exception as e: |
| pass |
| |
| |
| if self.is_model_selected('anomaly') and 'anomaly' in model_manager.models: |
| try: |
| anomaly_model = model_manager.get_model('anomaly') |
| anomaly_result = anomaly_model.predict_frame(frame) |
| |
| if anomaly_result is not None: |
| results['detections'].append({ |
| 'type': 'anomaly', |
| 'is_anomaly': anomaly_result.is_anomaly, |
| 'confidence': float(anomaly_result.confidence), |
| 'anomaly_score': float(anomaly_result.anomaly_score), |
| 'alert_level': anomaly_result.alert_level, |
| }) |
| |
| if anomaly_result.is_anomaly: |
| results['alerts'].append({ |
| 'type': 'anomaly_detected', |
| 'severity': anomaly_result.alert_level, |
| 'confidence': float(anomaly_result.confidence), |
| 'message': anomaly_result.description, |
| 'anomaly_score': float(anomaly_result.anomaly_score), |
| }) |
| |
| self._save_detection_screenshot( |
| frame, |
| 'anomaly', |
| anomaly_result.alert_level, |
| float(anomaly_result.confidence), |
| {'anomaly_score': float(anomaly_result.anomaly_score), 'description': anomaly_result.description} |
| ) |
| except Exception as e: |
| pass |
|
|
| has_weapon = any(det.get('type') == 'weapon' for det in results['detections']) |
| has_person = any(det.get('class') == 'person' for det in results['detections']) |
| |
| |
| if has_person: |
| self.person_consecutive_frames += 1 |
| else: |
| self.person_consecutive_frames = 0 |
| |
| if has_weapon: |
| self.weapon_consecutive_frames += 1 |
| else: |
| self.weapon_consecutive_frames = 0 |
| |
| |
| if self.person_consecutive_frames >= 3 and self.person_consecutive_frames == 3: |
| self.person_detection_frames += 1 |
| |
| if self.weapon_consecutive_frames >= 3 and self.weapon_consecutive_frames == 3: |
| self.weapon_detection_frames += 1 |
| |
| |
| results['person_detected'] = has_person |
| results['weapon_detected'] = has_weapon |
| results['person_consecutive_frames'] = self.person_consecutive_frames |
| results['weapon_consecutive_frames'] = self.weapon_consecutive_frames |
| |
| pose_summary = results.get('pose') |
| unsafe_pose = pose_summary and pose_summary.get('risk_level') in {'LOW_RISK', 'HIGH_RISK'} |
|
|
| |
| if self.weapon_detection_frames >= self.alert_hit_threshold and not self.weapon_alert_triggered: |
| self.weapon_alert_triggered = True |
| results['alerts'].append({ |
| 'type': 'weapon_detected', |
| 'severity': 'HIGH', |
| 'message': f'Weapon detected {self.weapon_detection_frames} times in video', |
| 'person_count': frame_result.person_count if frame_result else 0, |
| 'total_weapon_detections': self.weapon_detection_frames, |
| }) |
| |
| weapon_det = next((d for d in results['detections'] if d.get('type') == 'weapon'), None) |
| if weapon_det: |
| self._save_detection_screenshot( |
| frame, |
| 'weapon', |
| 'HIGH', |
| weapon_det.get('confidence', 0.0), |
| {'weapon_class': weapon_det.get('class', 'unknown')} |
| ) |
|
|
| if has_weapon and unsafe_pose: |
| self.unsafe_hit_streak, self.unsafe_alert_latched, unsafe_should_alert = self._update_alert_streak( |
| True, |
| self.unsafe_hit_streak, |
| self.unsafe_alert_latched, |
| ) |
| if unsafe_should_alert: |
| severity = 'CRITICAL' if pose_summary.get('risk_level') == 'HIGH_RISK' else 'HIGH' |
| results['alerts'].append({ |
| 'type': 'weapon_unsafe_pose', |
| 'severity': severity, |
| 'message': ( |
| f"Weapon and unsafe pose confirmed after {self.alert_hit_threshold} hits: " |
| f"{pose_summary.get('action', 'unknown')} ({pose_summary.get('risk_level')})" |
| ), |
| 'pose_action': pose_summary.get('action', 'unknown'), |
| 'pose_risk': pose_summary.get('risk_level', 'SAFE'), |
| 'pose_score': float(pose_summary.get('risk_score', 0.0)), |
| 'hit_count': self.unsafe_hit_streak, |
| }) |
| |
| self._save_detection_screenshot( |
| frame, |
| 'risk', |
| severity, |
| pose_summary.get('risk_score', 0.0), |
| { |
| 'action': pose_summary.get('action', 'unknown'), |
| 'risk_level': pose_summary.get('risk_level', 'SAFE'), |
| 'has_weapon': True |
| } |
| ) |
| else: |
| self.unsafe_hit_streak = 0 |
| self.unsafe_alert_latched = False |
|
|
| |
| self._overlay_detections(results['annotated_frame'], results['detections']) |
| self._draw_person_bounding_boxes(results['annotated_frame'], results['detections']) |
| summary = self._build_overlay_summary(results) |
| self._draw_top_info_panel(results['annotated_frame'], summary) |
| |
| self.last_pose_summary = results.get('pose') |
| |
| |
| if self.is_model_selected('lstm') and 'lstm' in model_manager.models: |
| try: |
| lstm_result = self._process_lstm_frame(results['annotated_frame']) |
| if lstm_result: |
| results['lstm_prediction'] = lstm_result |
| |
| self._annotate_lstm_result(results['annotated_frame'], lstm_result) |
| except Exception as e: |
| pass |
| |
| results['frame_summary'] = self._build_frame_summary(self.frame_index, results) |
| |
| return results |
| |
| def _process_lstm_frame(self, frame): |
| """Process frame through LSTM model for temporal behavior analysis.""" |
| try: |
| |
| h, w = frame.shape[:2] |
| target_size = (64, 64) |
| resized = cv2.resize(frame, target_size) |
| |
| |
| if len(resized.shape) == 3 and resized.shape[2] == 3: |
| rgb_frame = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) |
| else: |
| rgb_frame = resized |
| |
| |
| normalized = rgb_frame.astype(np.float32) / 255.0 |
| |
| |
| self.lstm_frame_buffer.append(normalized) |
| |
| |
| if len(self.lstm_frame_buffer) > 16: |
| self.lstm_frame_buffer.pop(0) |
| |
| |
| if len(self.lstm_frame_buffer) == 16: |
| lstm_model = model_manager.get_model('lstm') |
| if lstm_model: |
| try: |
| |
| sequence = np.array(self.lstm_frame_buffer) |
| |
| |
| batch_sequence = np.expand_dims(sequence, axis=0) |
| |
| |
| predictions = lstm_model.predict(batch_sequence, verbose=0) |
| |
| |
| if isinstance(predictions, np.ndarray): |
| pred_value = float(np.max(predictions)) |
| pred_class = int(np.argmax(predictions)) |
| |
| |
| behavior_labels = ['normal', 'anomalous', 'violent', 'suspicious'] |
| behavior = behavior_labels[min(pred_class, len(behavior_labels)-1)] |
| |
| |
| self.lstm_prediction_cache = { |
| 'behavior': behavior, |
| 'confidence': min(pred_value, 1.0), |
| 'class': pred_class, |
| 'buffer_size': len(self.lstm_frame_buffer) |
| } |
| |
| return self.lstm_prediction_cache |
| except Exception as lstm_pred_error: |
| pass |
| except Exception as e: |
| pass |
| |
| return None if not self.lstm_prediction_cache else self.lstm_prediction_cache |
| |
| def _annotate_lstm_result(self, frame, lstm_result): |
| """Draw LSTM prediction on frame.""" |
| if not lstm_result: |
| return |
| |
| behavior = lstm_result.get('behavior', 'unknown') |
| confidence = lstm_result.get('confidence', 0.0) |
| |
| |
| color_map = { |
| 'normal': (0, 255, 0), |
| 'anomalous': (0, 165, 255), |
| 'violent': (0, 0, 255), |
| 'suspicious': (0, 165, 255), |
| } |
| |
| color = color_map.get(behavior, (128, 128, 128)) |
| |
| |
| h, w = frame.shape[:2] |
| text = f"LSTM: {behavior.upper()} ({confidence:.2f})" |
| text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] |
| |
| x = max(0, w - text_size[0] - 10) |
| y = min(h, h - 10) |
| |
| cv2.rectangle(frame, (x-5, y-text_size[1]-5), (x+text_size[0]+5, y+5), color, -1) |
| cv2.putText(frame, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
|
|
|
| video_processor = VideoProcessor() |
|
|
| |
| analysis_jobs = {} |
|
|
| |
| camera_lock = threading.Lock() |
| current_camera = None |
| selected_camera_index = 0 |
|
|
| |
| recording_lock = threading.Lock() |
| is_recording = False |
| video_writer = None |
| recording_filename = None |
| recording_start_time = None |
| last_recorded_frame = None |
|
|
| def enumerate_cameras(max_cameras=10): |
| """Detect all available cameras""" |
| available_cameras = [] |
| |
| for i in range(max_cameras): |
| cap = cv2.VideoCapture(i) |
| if cap.isOpened(): |
| |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| |
| camera_info = { |
| 'index': i, |
| 'name': f"Camera {i}", |
| 'resolution': f"{width}x{height}", |
| 'fps': round(fps, 1), |
| 'available': True |
| } |
| available_cameras.append(camera_info) |
| cap.release() |
| |
| return available_cameras |
|
|
| def generate_camera_frames(): |
| """Generate frames from camera using VideoCapture with motion detection (matches main_cctv.py)""" |
| global current_camera, selected_camera_index |
|
|
| with camera_lock: |
| if current_camera is None: |
| |
| current_camera = VideoCapture(selected_camera_index, use_motion_detection=True) |
| if not current_camera.start(verbose=False): |
| |
| available = enumerate_cameras() |
| if available: |
| |
| for cam in available: |
| current_camera = VideoCapture(cam['index'], use_motion_detection=True) |
| if current_camera.start(verbose=False): |
| selected_camera_index = cam['index'] |
| break |
| current_camera = None |
|
|
| if current_camera is None: |
| return |
|
|
| |
| if current_camera.cap is not None: |
| current_camera.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) |
| current_camera.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) |
|
|
| video_processor.reset_session_state() |
|
|
| frame_counter = 0 |
| process_every_n = 3 |
| last_results = None |
|
|
| try: |
| for original, rois, fg_mask in current_camera.stream_rois(): |
| frame_counter += 1 |
|
|
| |
| if frame_counter % process_every_n == 0 or last_results is None: |
| results = video_processor.process_frame(original) |
| last_results = results |
| annotated_frame = results['annotated_frame'] |
| else: |
| annotated_frame = original.copy() |
| if last_results: |
| video_processor._overlay_detections(annotated_frame, last_results['detections']) |
| video_processor._draw_person_bounding_boxes(annotated_frame, last_results['detections']) |
| summary = video_processor._build_overlay_summary(last_results) |
| video_processor._draw_top_info_panel(annotated_frame, summary) |
|
|
| |
| global is_recording, video_writer, recording_filename, last_recorded_frame |
|
|
| with recording_lock: |
| if is_recording and annotated_frame is not None: |
| |
| if video_writer is None: |
| video_dir = UPLOAD_FOLDER / 'videos' |
| video_dir.mkdir(parents=True, exist_ok=True) |
|
|
| height, width = annotated_frame.shape[:2] |
| fourcc, ext = get_video_codec() |
|
|
| |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| recording_filename = f"video_{timestamp}{ext}" |
| filepath = video_dir / recording_filename |
|
|
| video_writer = cv2.VideoWriter( |
| str(filepath), |
| fourcc, |
| 15.0, |
| (width, height) |
| ) |
|
|
| global recording_start_time |
| recording_start_time = datetime.now() |
|
|
| |
| if video_writer.isOpened(): |
| video_writer.write(annotated_frame) |
| last_recorded_frame = annotated_frame.copy() |
|
|
| |
| ret, buffer = cv2.imencode('.jpg', annotated_frame, [cv2.IMWRITE_JPEG_QUALITY, 75]) |
| if not ret: |
| continue |
| frame_bytes = buffer.tobytes() |
|
|
| yield (b'--frame\r\n' |
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') |
| except GeneratorExit: |
| pass |
|
|
|
|
| |
|
|
| @app.route('/') |
| def home(): |
| """Public homepage with project information""" |
| return render_template('home.html') |
|
|
|
|
| @app.route('/login') |
| def login_page(): |
| """Login/Register page""" |
| if 'user_id' in session: |
| return redirect(url_for('dashboard')) |
| return render_template('login.html') |
|
|
|
|
| @app.route('/index') |
| def index(): |
| """Redirect old index route to login""" |
| return redirect(url_for('login_page')) |
|
|
|
|
| @app.route('/register', methods=['POST']) |
| def register(): |
| """Register new user""" |
| try: |
| data = request.json |
| username = data.get('username') |
| email = data.get('email') |
| password = data.get('password') |
| |
| |
| if not username or not email or not password: |
| return jsonify({'success': False, 'message': 'All fields are required'}), 400 |
| |
| |
| if User.query.filter_by(username=username).first(): |
| return jsonify({'success': False, 'message': 'Username already exists'}), 400 |
| |
| if User.query.filter_by(email=email).first(): |
| return jsonify({'success': False, 'message': 'Email already registered'}), 400 |
| |
| |
| user = User(username=username, email=email) |
| user.set_password(password) |
| db.session.add(user) |
| db.session.commit() |
| |
| return jsonify({'success': True, 'message': 'Registration successful! Please login.'}) |
| |
| except Exception as e: |
| return jsonify({'success': False, 'message': str(e)}), 500 |
|
|
|
|
| @app.route('/login', methods=['POST']) |
| def login(): |
| """Login user""" |
| try: |
| data = request.json |
| username = data.get('username') |
| password = data.get('password') |
| |
| user = User.query.filter_by(username=username).first() |
| |
| if user and user.check_password(password): |
| session.permanent = True |
| session['user_id'] = user.id |
| session['username'] = user.username |
| return jsonify({'success': True, 'message': 'Login successful!'}) |
| else: |
| return jsonify({'success': False, 'message': 'Invalid username or password'}), 401 |
| |
| except Exception as e: |
| return jsonify({'success': False, 'message': str(e)}), 500 |
|
|
|
|
| @app.route('/logout') |
| def logout(): |
| """Logout user""" |
| session.clear() |
| return redirect(url_for('home')) |
|
|
|
|
| @app.route('/dashboard') |
| def dashboard(): |
| """Main dashboard after login""" |
| if 'user_id' not in session: |
| return redirect(url_for('login_page')) |
| |
| return render_template('dashboard.html', username=session.get('username')) |
|
|
|
|
| @app.route('/live-camera') |
| def live_camera(): |
| """Live camera analysis page""" |
| if 'user_id' not in session: |
| return redirect(url_for('login_page')) |
| |
| return render_template('live_camera.html') |
|
|
|
|
| @app.route('/video-analysis') |
| def video_analysis(): |
| """Video upload and analysis page""" |
| if 'user_id' not in session: |
| return redirect(url_for('login_page')) |
| |
| return render_template('video_analysis.html') |
|
|
|
|
| @app.route('/camera_feed') |
| def camera_feed(): |
| """Video streaming route""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| return Response(generate_camera_frames(), |
| mimetype='multipart/x-mixed-replace; boundary=frame') |
|
|
|
|
| @app.route('/processed/<path:filename>') |
| def processed_file(filename): |
| """Serve processed previews and videos.""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| return send_from_directory(app.config['PROCESSED_FOLDER'], filename) |
|
|
|
|
| @app.route('/upload_video', methods=['POST']) |
| def upload_video(): |
| """Upload video for analysis""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| if 'video' not in request.files: |
| return jsonify({'success': False, 'message': 'No video file uploaded'}), 400 |
| |
| file = request.files['video'] |
| if file.filename == '': |
| return jsonify({'success': False, 'message': 'No file selected'}), 400 |
| |
| |
| filename = secure_filename(file.filename) |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| filename = f"{timestamp}_{filename}" |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(filepath) |
| |
| |
| results = process_video_file(filepath) |
| |
| |
| history = AnalysisHistory( |
| user_id=session['user_id'], |
| analysis_type='video', |
| filename=filename, |
| original_filename=file.filename, |
| detections=json.dumps(results.get('detections', [])), |
| alert_count=len(results.get('alerts', [])), |
| detection_count=len(results.get('detections', [])), |
| models_used=json.dumps(session.get('selected_models', [])), |
| processed_video_url=results.get('output_url'), |
| preview_image_url=results.get('preview_url'), |
| emergency_frames=json.dumps(results.get('emergency_frames', [])), |
| total_frames=results.get('total_frames', 0), |
| processed_frames=results.get('processed_frames', 0), |
| frame_summaries=json.dumps(results.get('frame_summaries', [])) |
| ) |
| db.session.add(history) |
| db.session.commit() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Video processed successfully', |
| 'analysis_id': history.id, |
| 'results': results |
| }) |
| |
| except Exception as e: |
| return jsonify({'success': False, 'message': str(e)}), 500 |
|
|
|
|
| def process_video_file(filepath): |
| """Process an uploaded video file with selected models and capture emergency frames""" |
| cap = cv2.VideoCapture(filepath) |
| all_detections = [] |
| all_alerts = [] |
| frame_count = 0 |
| processed_frames = 0 |
| frame_summaries = [] |
| emergency_frames = [] |
| |
| |
| selected_models = session.get('selected_models', []) |
| if selected_models: |
| video_processor.set_selected_models(selected_models) |
|
|
| input_path = Path(filepath) |
| processed_name = f"{input_path.stem}_processed.mp4" |
| preview_name = f"{input_path.stem}_preview.jpg" |
| processed_path = Path(app.config['PROCESSED_FOLDER']) / processed_name |
| preview_path = Path(app.config['PROCESSED_FOLDER']) / preview_name |
| |
| |
| emergency_frames_dir = Path(app.config['PROCESSED_FOLDER']) / 'emergency_frames' |
| emergency_frames_dir.mkdir(parents=True, exist_ok=True) |
|
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 640 |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 480 |
| fps = cap.get(cv2.CAP_PROP_FPS) or 20.0 |
| fourcc, ext = get_video_codec() |
| |
| processed_name = f"{input_path.stem}_processed{ext}" |
| processed_path = Path(app.config['PROCESSED_FOLDER']) / processed_name |
| writer = cv2.VideoWriter( |
| str(processed_path), |
| fourcc, |
| fps, |
| (width, height), |
| ) |
|
|
| video_processor.reset_session_state() |
| |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| |
| if frame_count % PROCESSING_PARAMS['frame_skip'] == 0: |
| results = video_processor.process_frame(frame) |
| all_detections.extend(results['detections']) |
| all_alerts.extend(results['alerts']) |
| frame_summaries.append(results['frame_summary']) |
| writer.write(results['annotated_frame']) |
| processed_frames += 1 |
| if processed_frames == 1: |
| cv2.imwrite(str(preview_path), results['annotated_frame']) |
| |
| |
| has_critical_alert = any(alert.get('severity') in {'HIGH', 'CRITICAL'} for alert in results['alerts']) |
| has_weapon = any(det.get('type') == 'weapon' for det in results['detections']) |
| has_violence = any(alert.get('type') == 'violence' for alert in results['alerts']) |
| has_anomaly_alert = any(alert.get('type') == 'anomaly_detected' for alert in results['alerts']) |
| |
| if has_critical_alert or has_weapon or has_violence or has_anomaly_alert: |
| |
| emergency_filename = f"emergency_frame_{processed_frames}_t{frame_count}.jpg" |
| emergency_path = emergency_frames_dir / emergency_filename |
| cv2.imwrite(str(emergency_path), results['annotated_frame']) |
| emergency_frames.append({ |
| 'filename': emergency_filename, |
| 'frame_number': processed_frames, |
| 'timestamp_frame': frame_count, |
| 'alert_type': 'CRITICAL' if has_critical_alert else ('WEAPON' if has_weapon else ('VIOLENCE' if has_violence else 'ANOMALY')), |
| 'has_weapon': has_weapon, |
| 'has_violence': has_violence, |
| 'has_anomaly': has_anomaly_alert |
| }) |
| |
| frame_count += 1 |
| |
| cap.release() |
| writer.release() |
|
|
| risk_counts = {'SAFE': 0, 'LOW_RISK': 0, 'HIGH_RISK': 0, 'UNAVAILABLE': 0} |
| for summary in frame_summaries: |
| risk_counts[summary['pose_risk']] = risk_counts.get(summary['pose_risk'], 0) + 1 |
|
|
| output_url = url_for('processed_file', filename=processed_name) if processed_frames else None |
| preview_url = url_for('processed_file', filename=preview_name) if processed_frames else None |
| output_mime = mimetypes.guess_type(processed_name)[0] or 'video/mp4' |
| |
| return { |
| 'total_frames': frame_count, |
| 'processed_frames': processed_frames, |
| 'detections': all_detections, |
| 'alerts': all_alerts, |
| 'frame_summaries': frame_summaries, |
| 'output_url': output_url, |
| 'output_mime': output_mime, |
| 'preview_url': preview_url, |
| 'emergency_frames': emergency_frames, |
| 'summary': { |
| 'total_detections': len(all_detections), |
| 'total_alerts': len(all_alerts), |
| 'emergency_frames_count': len(emergency_frames), |
| 'weapon_alert_frames': sum(1 for frame in frame_summaries if frame['weapon_count'] > 0), |
| 'unsafe_pose_frames': sum(1 for frame in frame_summaries if frame['pose_risk'] in {'LOW_RISK', 'HIGH_RISK'}), |
| 'critical_frames': sum(1 for frame in frame_summaries if frame['alert_state'] == 'CRITICAL'), |
| 'pose_risk_counts': risk_counts, |
| } |
| } |
|
|
|
|
| @app.route('/api/models') |
| def list_models(): |
| """List all available models""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| models = model_manager.list_models() |
| return jsonify({'models': models}) |
|
|
|
|
| @app.route('/api/available-models') |
| def get_available_models(): |
| """Get detailed information about available models""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| models = model_manager.get_model_details() |
| return jsonify({'models': models}) |
|
|
|
|
| @app.route('/api/set-models', methods=['POST']) |
| def set_active_models(): |
| """Set which models to use for analysis""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| data = request.get_json() |
| selected_models = data.get('models', []) |
| |
| |
| session['selected_models'] = selected_models |
| session.modified = True |
| |
| |
| video_processor.set_selected_models(selected_models) |
| |
| return jsonify({ |
| 'success': True, |
| 'message': f'Models updated: {selected_models if selected_models else "All"}', |
| 'selected': selected_models |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/get-selected-models') |
| def get_selected_models(): |
| """Get currently selected models for the user""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| selected = session.get('selected_models', []) |
| return jsonify({'selected_models': selected}) |
|
|
|
|
| @app.route('/api/live-stats') |
| def get_live_stats(): |
| """Get live monitoring statistics""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| pose_data = {} |
| if video_processor.last_pose_summary: |
| pose_data = { |
| 'risk_level': video_processor.last_pose_summary.get('risk_level', 'SAFE'), |
| 'action': video_processor.last_pose_summary.get('action', 'unknown'), |
| 'risk_score': video_processor.last_pose_summary.get('risk_score', 0.0) |
| } |
| |
| return jsonify({ |
| 'detection_count': session.get('total_detections', 0), |
| 'alert_count': session.get('total_alerts', 0), |
| 'person_detections': video_processor.person_detection_frames, |
| 'weapon_detections': video_processor.weapon_detection_frames, |
| 'person_visible': video_processor.person_consecutive_frames >= 3, |
| 'weapon_visible': video_processor.weapon_consecutive_frames >= 3, |
| 'pose_analysis': pose_data |
| }) |
|
|
|
|
| @app.route('/api/start-camera', methods=['POST']) |
| def start_camera_session(): |
| """Start a camera monitoring session with selected models""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| data = request.get_json() |
| selected_models = data.get('selectedModels', []) |
| |
| |
| session['monitoring_session_start'] = datetime.utcnow().isoformat() |
| session['monitoring_models'] = selected_models |
| session.modified = True |
| |
| |
| video_processor.reset_session_state() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Camera monitoring session started', |
| 'models': selected_models |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/cameras', methods=['GET']) |
| def get_available_cameras(): |
| """Get list of all available cameras""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| cameras = enumerate_cameras() |
| return jsonify({ |
| 'success': True, |
| 'cameras': cameras, |
| 'selected': selected_camera_index |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/select-camera', methods=['POST']) |
| def select_camera(): |
| """Select which camera to use""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| global current_camera, selected_camera_index |
| |
| try: |
| data = request.get_json() |
| camera_index = data.get('camera_index', 0) |
| |
| |
| with camera_lock: |
| if current_camera is not None: |
| current_camera.stop() |
| current_camera = None |
| |
| selected_camera_index = camera_index |
| |
| return jsonify({ |
| 'success': True, |
| 'message': f'Camera {camera_index} selected', |
| 'selected': camera_index |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| |
| def parse_iso_timestamp(timestamp_str): |
| """Parse ISO format timestamp, handling 'Z' suffix for UTC""" |
| if isinstance(timestamp_str, str): |
| |
| if timestamp_str.endswith('Z'): |
| timestamp_str = timestamp_str[:-1] |
| try: |
| return datetime.fromisoformat(timestamp_str) |
| except (ValueError, TypeError): |
| return datetime.utcnow() |
| elif isinstance(timestamp_str, datetime): |
| return timestamp_str |
| return datetime.utcnow() |
|
|
|
|
| @app.route('/api/save-monitoring-session', methods=['POST']) |
| def save_monitoring_session(): |
| """Save monitoring session results to database with optional video recording""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| data = request.get_json() |
| |
| |
| recording_filename = data.get('recordingFilename') |
| video_filename = None |
| preview_filename = None |
| |
| if recording_filename: |
| |
| try: |
| sessions_dir = PROCESSED_FOLDER / 'sessions' |
| sessions_dir.mkdir(parents=True, exist_ok=True) |
|
|
| timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') |
| user_folder = sessions_dir / f"user_{user_id}" |
| user_folder.mkdir(parents=True, exist_ok=True) |
|
|
| |
| source_path = UPLOAD_FOLDER / 'videos' / recording_filename |
| if source_path.exists(): |
| |
| video_filename = f"session_{timestamp}_{uuid.uuid4().hex[:8]}.mp4" |
| dest_path = user_folder / video_filename |
| shutil.copy2(str(source_path), str(dest_path)) |
|
|
| preview_filename = None |
| preview_path = None |
|
|
| |
| try: |
| cap = cv2.VideoCapture(str(dest_path)) |
| ret, frame = cap.read() |
| if ret: |
| preview_filename = f"preview_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" |
| preview_path = user_folder / preview_filename |
| cv2.imwrite(str(preview_path), frame) |
| cap.release() |
| except Exception: |
| pass |
|
|
| |
| live_session = LiveSession( |
| user_id=user_id, |
| session_name=data.get('sessionName', f"Live Session {timestamp}"), |
| video_filename=str(dest_path.relative_to(PROCESSED_FOLDER)), |
| preview_image=str(preview_path.relative_to(PROCESSED_FOLDER)) if preview_path else None, |
| detection_count=data.get('totalDetections', 0), |
| alert_count=data.get('totalAlerts', 0), |
| threat_count=data.get('threatCount', 0), |
| duration=int(data.get('duration', 0)), |
| total_frames=data.get('totalFrames', 0), |
| models_used=json.dumps(data.get('selectedModels', [])), |
| detections_summary=json.dumps(data.get('detectionsSummary', {})), |
| alerts_summary=json.dumps(data.get('alertsSummary', [])), |
| emergency_frames=json.dumps(data.get('emergencyFrames', [])), |
| created_at=parse_iso_timestamp(data.get('startTime')), |
| ended_at=parse_iso_timestamp(data.get('endTime')), |
| is_critical=data.get('isCritical', False) |
| ) |
| db.session.add(live_session) |
| db.session.commit() |
|
|
| return jsonify({ |
| 'success': True, |
| 'message': 'Live session saved successfully', |
| 'session_id': live_session.id, |
| 'video_url': url_for('processed_file', filename=str(dest_path.relative_to(PROCESSED_FOLDER))) |
| }) |
| except Exception: |
| pass |
| |
| |
| |
| video_data = data.get('videoData') |
| if video_data: |
| |
| sessions_dir = PROCESSED_FOLDER / 'sessions' |
| sessions_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') |
| user_folder = sessions_dir / f"user_{user_id}" |
| user_folder.mkdir(parents=True, exist_ok=True) |
|
|
| video_filename = f"session_{timestamp}_{uuid.uuid4().hex[:8]}.mp4" |
| video_path = user_folder / video_filename |
|
|
| |
| try: |
| |
| if video_data.startswith('data:'): |
| video_data = video_data.split(',', 1)[1] |
|
|
| video_bytes = base64.b64decode(video_data) |
| with open(video_path, 'wb') as f: |
| f.write(video_bytes) |
|
|
| preview_filename = None |
| preview_path = None |
|
|
| |
| try: |
| cap = cv2.VideoCapture(str(video_path)) |
| ret, frame = cap.read() |
| if ret: |
| preview_filename = f"preview_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" |
| preview_path = user_folder / preview_filename |
| cv2.imwrite(str(preview_path), frame) |
| cap.release() |
| except Exception: |
| pass |
|
|
| |
| live_session = LiveSession( |
| user_id=user_id, |
| session_name=data.get('sessionName', f"Session {timestamp}"), |
| video_filename=str(video_path.relative_to(PROCESSED_FOLDER)), |
| preview_image=str(preview_path.relative_to(PROCESSED_FOLDER)) if preview_path else None, |
| detection_count=data.get('totalDetections', 0), |
| alert_count=data.get('totalAlerts', 0), |
| threat_count=data.get('threatCount', 0), |
| duration=int(data.get('duration', 0)), |
| total_frames=data.get('totalFrames', 0), |
| models_used=json.dumps(data.get('selectedModels', [])), |
| detections_summary=json.dumps(data.get('detectionsSummary', {})), |
| alerts_summary=json.dumps(data.get('alertsSummary', [])), |
| emergency_frames=json.dumps(data.get('emergencyFrames', [])), |
| created_at=parse_iso_timestamp(data.get('startTime')), |
| ended_at=parse_iso_timestamp(data.get('endTime')), |
| is_critical=data.get('isCritical', False) |
| ) |
| db.session.add(live_session) |
| db.session.commit() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': f'Live session saved successfully', |
| 'session_id': live_session.id, |
| 'video_url': f"/processed/sessions/user_{user_id}/{video_filename}" |
| }) |
| except Exception as e: |
| pass |
| |
| |
| |
| history = AnalysisHistory( |
| user_id=user_id, |
| analysis_type='live', |
| detection_count=data.get('totalDetections', 0), |
| alert_count=data.get('totalAlerts', 0), |
| duration=data.get('duration', 0), |
| models_used=str(data.get('selectedModels', [])), |
| created_at=parse_iso_timestamp(data.get('startTime')), |
| ended_at=parse_iso_timestamp(data.get('endTime')), |
| detections=str(data.get('detections', [])) |
| ) |
| |
| db.session.add(history) |
| db.session.commit() |
| |
| |
| session.pop('monitoring_session_start', None) |
| session.pop('monitoring_models', None) |
| session.modified = True |
| |
| return jsonify({ |
| 'success': True, |
| 'message': f'Monitoring session saved. Detection Count: {data.get("totalDetections", 0)}, Alerts: {data.get("totalAlerts", 0)}', |
| 'session_id': history.id |
| }) |
| except Exception as e: |
| pass |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/save-person-detection', methods=['POST']) |
| def save_person_detection(): |
| """Save person detection data to database""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| data = request.get_json() |
| user_id = session['user_id'] |
| |
| |
| person_detection = PersonDetection( |
| user_id=user_id, |
| person_count=data.get('person_count', 0), |
| is_present=data.get('is_present', False), |
| confidence=data.get('confidence', 0.0), |
| detection_details=data.get('detection_details'), |
| detected_at=datetime.utcnow(), |
| session_date=datetime.utcnow().date() |
| ) |
| |
| db.session.add(person_detection) |
| db.session.commit() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Person detection saved', |
| 'detection_id': person_detection.id |
| }) |
| except Exception as e: |
| db.session.rollback() |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/person-detection-history', methods=['GET']) |
| def get_person_detection_history(): |
| """Get person detection history for current user""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| |
| |
| today = datetime.utcnow().date() |
| detections = PersonDetection.query.filter_by( |
| user_id=user_id, |
| session_date=today |
| ).all() |
| |
| if not detections: |
| return jsonify({ |
| 'success': True, |
| 'data': { |
| 'total_detections': 0, |
| 'currently_present': False, |
| 'detection_records': [] |
| } |
| }) |
| |
| |
| total_detections = sum(d.person_count for d in detections) |
| latest_detection = detections[-1] |
| currently_present = latest_detection.is_present |
| |
| return jsonify({ |
| 'success': True, |
| 'data': { |
| 'total_detections': total_detections, |
| 'currently_present': currently_present, |
| 'latest_confidence': latest_detection.confidence, |
| 'detection_count': len(detections), |
| 'detection_records': [ |
| { |
| 'id': d.id, |
| 'person_count': d.person_count, |
| 'is_present': d.is_present, |
| 'confidence': d.confidence, |
| 'detected_at': d.detected_at.isoformat() |
| } |
| for d in detections |
| ] |
| } |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/detection-history', methods=['GET']) |
| def get_detection_history(): |
| """Get threat detection history (weapons, unusual activity, risks) and live sessions""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| limit = request.args.get('limit', 20, type=int) |
| detection_type = request.args.get('type', None) |
| include_sessions = request.args.get('include_sessions', 'true').lower() == 'true' |
| |
| result_data = [] |
| |
| |
| if not detection_type or detection_type != 'session': |
| query = DetectionHistory.query.filter_by(user_id=user_id) |
| |
| if detection_type: |
| query = query.filter_by(detection_type=detection_type) |
| |
| detections = query.order_by(DetectionHistory.detected_at.desc()).limit(limit).all() |
| |
| for d in detections: |
| result_data.append({ |
| 'id': d.id, |
| 'type': 'detection', |
| 'detection_type': d.detection_type, |
| 'alert_level': d.alert_level, |
| 'confidence': d.confidence, |
| 'image_filename': d.image_filename, |
| 'detected_at': d.detected_at.isoformat(), |
| 'details': json.loads(d.detection_details) if d.detection_details else {} |
| }) |
| |
| |
| if include_sessions and (not detection_type or detection_type == 'session'): |
| sessions = LiveSession.query.filter_by(user_id=user_id).order_by(LiveSession.created_at.desc()).limit(limit).all() |
| |
| for live_sess in sessions: |
| result_data.append({ |
| 'id': live_sess.id, |
| 'type': 'session', |
| 'session_name': live_sess.session_name, |
| 'detection_count': live_sess.detection_count, |
| 'alert_count': live_sess.alert_count, |
| 'threat_count': live_sess.threat_count, |
| 'duration': live_sess.duration, |
| 'video_filename': live_sess.video_filename, |
| 'preview_image': live_sess.preview_image, |
| 'is_critical': live_sess.is_critical, |
| 'created_at': live_sess.created_at.isoformat(), |
| 'details': { |
| 'models_used': json.loads(live_sess.models_used) if live_sess.models_used else [], |
| 'emergency_frames': json.loads(live_sess.emergency_frames) if live_sess.emergency_frames else [] |
| } |
| }) |
| |
| |
| result_data.sort(key=lambda x: x.get('detected_at') or x.get('created_at'), reverse=True) |
| |
| return jsonify({ |
| 'success': True, |
| 'data': result_data |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/detection-image/<filename>') |
| def get_detection_image(filename): |
| """Get detection screenshot image""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| |
| user_id = session['user_id'] |
| |
| |
| detection = DetectionHistory.query.filter_by( |
| user_id=user_id, |
| image_filename=filename |
| ).first() |
| |
| if not detection: |
| return jsonify({'error': 'Image not found'}), 404 |
| |
| |
| upload_dir = UPLOAD_FOLDER / 'detections' |
| filepath = upload_dir / filename |
|
|
| if filepath.exists(): |
| return send_from_directory(str(upload_dir), filename) |
| else: |
| return jsonify({'error': 'Image file not found'}), 404 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
|
|
| @app.route('/stop_camera', methods=['POST']) |
| def stop_camera(): |
| """Stop the camera stream""" |
| global current_camera |
| |
| with camera_lock: |
| if current_camera is not None: |
| current_camera.stop() |
| current_camera = None |
| |
| return jsonify({'success': True}) |
|
|
|
|
| @app.route('/api/start-recording', methods=['POST']) |
| def start_recording(): |
| """Start video recording""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| global is_recording |
| |
| with recording_lock: |
| if is_recording: |
| return jsonify({'success': False, 'error': 'Recording already in progress'}), 400 |
| |
| is_recording = True |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Recording started', |
| 'timestamp': datetime.utcnow().isoformat() |
| }) |
|
|
|
|
| @app.route('/api/stop-recording', methods=['POST']) |
| def stop_recording(): |
| """Stop video recording""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| global is_recording, video_writer, recording_filename, recording_start_time |
| |
| with recording_lock: |
| if not is_recording: |
| return jsonify({'success': False, 'error': 'No recording in progress'}), 400 |
| |
| |
| if video_writer is not None: |
| video_writer.release() |
| video_writer = None |
| |
| is_recording = False |
| |
| duration = 0 |
| if recording_start_time: |
| duration = (datetime.now() - recording_start_time).total_seconds() |
| recording_start_time = None |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Recording stopped', |
| 'filename': recording_filename, |
| 'duration': round(duration, 2), |
| 'timestamp': datetime.utcnow().isoformat() |
| }) |
|
|
|
|
| @app.route('/api/recording-status', methods=['GET']) |
| def get_recording_status(): |
| """Get current recording status""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| duration = 0 |
| if is_recording and recording_start_time: |
| duration = (datetime.now() - recording_start_time).total_seconds() |
| |
| return jsonify({ |
| 'success': True, |
| 'is_recording': is_recording, |
| 'filename': recording_filename, |
| 'duration': round(duration, 2), |
| 'start_time': recording_start_time.isoformat() if recording_start_time else None |
| }) |
|
|
|
|
| @app.route('/api/videos', methods=['GET']) |
| def get_videos(): |
| """Get list of recorded videos""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| video_dir = UPLOAD_FOLDER / 'videos' |
| if not video_dir.exists(): |
| return jsonify({'success': True, 'videos': []}) |
|
|
| videos = [] |
| all_videos = sorted( |
| list(video_dir.glob('*.mp4')) + list(video_dir.glob('*.avi')), |
| key=lambda f: f.stat().st_mtime, reverse=True |
| ) |
| for video_file in all_videos: |
| stat = video_file.stat() |
| videos.append({ |
| 'filename': video_file.name, |
| 'size': round(stat.st_size / 1024 / 1024, 2), |
| 'created': datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| 'path': f'/api/download-video/{video_file.name}' |
| }) |
|
|
| return jsonify({'success': True, 'videos': videos}) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/images', methods=['GET']) |
| def get_images(): |
| """Get list of captured images""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| image_dir = UPLOAD_FOLDER / 'detections' |
| if not image_dir.exists(): |
| return jsonify({'success': True, 'images': []}) |
|
|
| images = [] |
| for image_file in sorted(image_dir.glob('*.jpg'), reverse=True): |
| stat = image_file.stat() |
| images.append({ |
| 'filename': image_file.name, |
| 'size': round(stat.st_size / 1024, 2), |
| 'created': datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| 'path': f'/api/detection-image/{image_file.name}' |
| }) |
|
|
| return jsonify({'success': True, 'images': images}) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/download-video/<filename>', methods=['GET']) |
| def download_video(filename): |
| """Download a recorded video""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| |
| if not filename.endswith(('.mp4', '.avi')) or '..' in filename: |
| return jsonify({'error': 'Invalid filename'}), 400 |
|
|
| video_dir = UPLOAD_FOLDER / 'videos' |
| filepath = video_dir / filename |
|
|
| if not filepath.exists(): |
| return jsonify({'error': 'Video not found'}), 404 |
|
|
| return send_from_directory(str(video_dir), filename, as_attachment=True) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/download-image/<filename>', methods=['GET']) |
| def download_image(filename): |
| """Download a captured image""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| |
| try: |
| |
| if not filename.endswith('.jpg') or '..' in filename: |
| return jsonify({'error': 'Invalid filename'}), 400 |
|
|
| image_dir = UPLOAD_FOLDER / 'detections' |
| filepath = image_dir / filename |
|
|
| if not filepath.exists(): |
| return jsonify({'error': 'Image not found'}), 404 |
|
|
| return send_from_directory(str(image_dir), filename, as_attachment=True) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/sessions/<int:session_id>', methods=['DELETE']) |
| def delete_session(session_id): |
| """Delete a live monitoring session and its associated files""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| try: |
| user_id = session['user_id'] |
| live_sess = LiveSession.query.filter_by(id=session_id, user_id=user_id).first() |
| if not live_sess: |
| return jsonify({'success': False, 'error': 'Session not found'}), 404 |
|
|
| |
| for attr, base_dir in [('video_filename', PROCESSED_FOLDER), ('preview_image', PROCESSED_FOLDER)]: |
| fname = getattr(live_sess, attr, None) |
| if fname: |
| fpath = base_dir / fname |
| if fpath.exists(): |
| fpath.unlink() |
|
|
| db.session.delete(live_sess) |
| db.session.commit() |
| return jsonify({'success': True}) |
| except Exception as e: |
| db.session.rollback() |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/videos/<filename>', methods=['DELETE']) |
| def delete_video(filename): |
| """Delete a raw recorded video file""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| try: |
| if '..' in filename or not filename.endswith('.mp4'): |
| return jsonify({'success': False, 'error': 'Invalid filename'}), 400 |
| fpath = UPLOAD_FOLDER / 'videos' / filename |
| if fpath.exists(): |
| fpath.unlink() |
| return jsonify({'success': True}) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/images/<filename>', methods=['DELETE']) |
| def delete_image(filename): |
| """Delete a detection image and its DB record""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| try: |
| if '..' in filename or not filename.endswith('.jpg'): |
| return jsonify({'success': False, 'error': 'Invalid filename'}), 400 |
| user_id = session['user_id'] |
| record = DetectionHistory.query.filter_by(user_id=user_id, image_filename=filename).first() |
| if record: |
| db.session.delete(record) |
| db.session.commit() |
| fpath = UPLOAD_FOLDER / 'detections' / filename |
| if fpath.exists(): |
| fpath.unlink() |
| return jsonify({'success': True}) |
| except Exception as e: |
| db.session.rollback() |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/video-analysis-history/<int:analysis_id>', methods=['GET']) |
| def get_video_analysis(analysis_id): |
| """Get details of a specific video analysis""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| analysis = AnalysisHistory.query.filter_by( |
| id=analysis_id, |
| user_id=user_id, |
| analysis_type='video' |
| ).first() |
| |
| if not analysis: |
| return jsonify({'success': False, 'error': 'Analysis not found'}), 404 |
| |
| return jsonify({ |
| 'success': True, |
| 'data': { |
| 'id': analysis.id, |
| 'original_filename': analysis.original_filename, |
| 'uploaded_filename': analysis.filename, |
| 'created_at': analysis.created_at.isoformat(), |
| 'total_frames': analysis.total_frames, |
| 'processed_frames': analysis.processed_frames, |
| 'detection_count': analysis.detection_count, |
| 'alert_count': analysis.alert_count, |
| 'processed_video_url': analysis.processed_video_url, |
| 'preview_image_url': analysis.preview_image_url, |
| 'models_used': json.loads(analysis.models_used) if analysis.models_used else [], |
| 'emergency_frames': json.loads(analysis.emergency_frames) if analysis.emergency_frames else [], |
| 'frame_summaries': json.loads(analysis.frame_summaries) if analysis.frame_summaries else [], |
| 'detections': json.loads(analysis.detections) if analysis.detections else [] |
| } |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/video-analysis-list', methods=['GET']) |
| def get_video_analysis_list(): |
| """Get list of all video analyses for current user""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| limit = request.args.get('limit', 20, type=int) |
| |
| analyses = AnalysisHistory.query.filter_by( |
| user_id=user_id, |
| analysis_type='video' |
| ).order_by(AnalysisHistory.created_at.desc()).limit(limit).all() |
| |
| return jsonify({ |
| 'success': True, |
| 'data': [ |
| { |
| 'id': a.id, |
| 'original_filename': a.original_filename, |
| 'created_at': a.created_at.isoformat(), |
| 'detection_count': a.detection_count, |
| 'alert_count': a.alert_count, |
| 'total_frames': a.total_frames, |
| 'processed_frames': a.processed_frames, |
| 'preview_image_url': a.preview_image_url, |
| 'emergency_frames_count': len(json.loads(a.emergency_frames)) if a.emergency_frames else 0 |
| } |
| for a in analyses |
| ] |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/reanalyze-video/<int:analysis_id>', methods=['POST']) |
| def reanalyze_video(analysis_id): |
| """Re-analyze a previously uploaded video with new or same models""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| analysis = AnalysisHistory.query.filter_by( |
| id=analysis_id, |
| user_id=user_id, |
| analysis_type='video' |
| ).first() |
| |
| if not analysis: |
| return jsonify({'success': False, 'error': 'Analysis not found'}), 404 |
| |
| |
| upload_dir = Path(app.config['UPLOAD_FOLDER']) |
| video_path = upload_dir / analysis.filename |
| |
| if not video_path.exists(): |
| return jsonify({'success': False, 'error': 'Original video file not found'}), 404 |
| |
| |
| results = process_video_file(str(video_path)) |
| |
| |
| new_history = AnalysisHistory( |
| user_id=user_id, |
| analysis_type='video', |
| filename=analysis.filename, |
| original_filename=analysis.original_filename, |
| detections=json.dumps(results.get('detections', [])), |
| alert_count=len(results.get('alerts', [])), |
| detection_count=len(results.get('detections', [])), |
| models_used=json.dumps(session.get('selected_models', [])), |
| processed_video_url=results.get('output_url'), |
| preview_image_url=results.get('preview_url'), |
| emergency_frames=json.dumps(results.get('emergency_frames', [])), |
| total_frames=results.get('total_frames', 0), |
| processed_frames=results.get('processed_frames', 0), |
| frame_summaries=json.dumps(results.get('frame_summaries', [])) |
| ) |
| db.session.add(new_history) |
| db.session.commit() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Video re-analyzed successfully', |
| 'analysis_id': new_history.id, |
| 'results': results |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
| @app.route('/api/emergency-frames/<int:analysis_id>', methods=['GET']) |
| def get_emergency_frames(analysis_id): |
| """Get emergency frames captured during video analysis""" |
| if 'user_id' not in session: |
| return jsonify({'success': False, 'error': 'Unauthorized'}), 401 |
| |
| try: |
| user_id = session['user_id'] |
| analysis = AnalysisHistory.query.filter_by( |
| id=analysis_id, |
| user_id=user_id, |
| analysis_type='video' |
| ).first() |
| |
| if not analysis: |
| return jsonify({'success': False, 'error': 'Analysis not found'}), 404 |
| |
| emergency_frames = json.loads(analysis.emergency_frames) if analysis.emergency_frames else [] |
| |
| |
| frames_with_urls = [] |
| for frame_info in emergency_frames: |
| frame_data = frame_info.copy() |
| frame_data['image_url'] = url_for('processed_file', filename=f"emergency_frames/{frame_info['filename']}") |
| frames_with_urls.append(frame_data) |
| |
| return jsonify({ |
| 'success': True, |
| 'data': { |
| 'analysis_id': analysis_id, |
| 'total_emergency_frames': len(frames_with_urls), |
| 'frames': frames_with_urls |
| } |
| }) |
| except Exception as e: |
| return jsonify({'success': False, 'error': str(e)}), 400 |
|
|
|
|
| |
|
|
| @app.route('/api/start-video-analysis', methods=['POST']) |
| def start_video_analysis(): |
| """Upload a video and queue it for background analysis. Returns a job_id.""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
|
|
| if 'video' not in request.files: |
| return jsonify({'success': False, 'message': 'No video file uploaded'}), 400 |
|
|
| file = request.files['video'] |
| if not file.filename: |
| return jsonify({'success': False, 'message': 'No file selected'}), 400 |
|
|
| ext = Path(file.filename).suffix.lower() |
| if ext not in {'.mp4', '.avi', '.mov', '.mkv'}: |
| return jsonify({'success': False, 'message': 'Unsupported file type'}), 400 |
|
|
| filename = secure_filename(file.filename) |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| save_name = f"{timestamp}_{filename}" |
| filepath = UPLOAD_FOLDER / save_name |
| file.save(str(filepath)) |
|
|
| selected_models = session.get('selected_models', []) |
| user_id = session['user_id'] |
|
|
| job_id = str(uuid.uuid4()) |
| analysis_jobs[job_id] = { |
| 'status': 'queued', |
| 'progress': 0, |
| 'current_frame': 0, |
| 'total_frames': 0, |
| 'alerts': [], |
| 'high_risk_alerts': [], |
| 'detection_count': 0, |
| 'alert_count': 0, |
| 'results': None, |
| 'analysis_id': None, |
| 'error': None, |
| } |
|
|
| thread = threading.Thread( |
| target=_process_video_background, |
| args=(job_id, str(filepath), user_id, selected_models, file.filename), |
| daemon=True, |
| ) |
| thread.start() |
|
|
| return jsonify({'success': True, 'job_id': job_id}) |
|
|
|
|
| @app.route('/api/analysis-progress/<job_id>') |
| def get_analysis_progress(job_id): |
| """Poll current progress of a background analysis job.""" |
| if 'user_id' not in session: |
| return jsonify({'error': 'Unauthorized'}), 401 |
|
|
| job = analysis_jobs.get(job_id) |
| if not job: |
| return jsonify({'success': False, 'error': 'Job not found'}), 404 |
|
|
| return jsonify({ |
| 'success': True, |
| 'status': job['status'], |
| 'progress': job['progress'], |
| 'current_frame': job['current_frame'], |
| 'total_frames': job['total_frames'], |
| 'alerts': job['alerts'][-30:], |
| 'high_risk_alerts': job['high_risk_alerts'], |
| 'detection_count': job['detection_count'], |
| 'alert_count': job['alert_count'], |
| 'results': job['results'] if job['status'] == 'done' else None, |
| 'analysis_id': job['analysis_id'], |
| 'error': job.get('error'), |
| }) |
|
|
|
|
| def _process_video_background(job_id, filepath, user_id, selected_models, original_filename): |
| """Process a video file in a background thread with real-time job state updates.""" |
| with app.app_context(): |
| job = analysis_jobs.get(job_id) |
| if not job: |
| return |
|
|
| job['status'] = 'processing' |
|
|
| try: |
| |
| local_proc = VideoProcessor() |
| if selected_models: |
| local_proc.set_selected_models(selected_models) |
| local_proc.reset_session_state() |
|
|
| cap = cv2.VideoCapture(filepath) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1 |
| fps = cap.get(cv2.CAP_PROP_FPS) or 20.0 |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 640 |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 480 |
|
|
| job['total_frames'] = total_frames |
|
|
| input_path = Path(filepath) |
| fourcc, ext = get_video_codec() |
| processed_name = f"{input_path.stem}_processed{ext}" |
| preview_name = f"{input_path.stem}_preview.jpg" |
| processed_path = PROCESSED_FOLDER / processed_name |
| preview_path = PROCESSED_FOLDER / preview_name |
|
|
| emergency_dir = PROCESSED_FOLDER / 'emergency_frames' |
| emergency_dir.mkdir(parents=True, exist_ok=True) |
| det_dir = UPLOAD_FOLDER / 'detections' |
| det_dir.mkdir(parents=True, exist_ok=True) |
|
|
| writer = cv2.VideoWriter( |
| str(processed_path), |
| fourcc, |
| fps, |
| (width, height), |
| ) |
|
|
| all_detections = [] |
| all_alerts = [] |
| frame_summaries = [] |
| emergency_frames = [] |
| frame_count = 0 |
| processed_frames = 0 |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| if frame_count % PROCESSING_PARAMS['frame_skip'] == 0: |
| results = local_proc.process_frame(frame) |
| all_detections.extend(results['detections']) |
| all_alerts.extend(results['alerts']) |
| frame_summaries.append(results['frame_summary']) |
| writer.write(results['annotated_frame']) |
| processed_frames += 1 |
|
|
| if processed_frames == 1: |
| cv2.imwrite(str(preview_path), results['annotated_frame']) |
|
|
| job['detection_count'] = len(all_detections) |
| job['alert_count'] = len(all_alerts) |
|
|
| has_weapon = any(d.get('type') == 'weapon' for d in results['detections']) |
| has_violence = any(a.get('type') == 'violence' for a in results['alerts']) |
| has_critical = any(a.get('severity') in {'HIGH', 'CRITICAL'} for a in results['alerts']) |
|
|
| |
| if has_critical or has_weapon or has_violence: |
| ef_name = f"emrg_{processed_frames}_t{frame_count}.jpg" |
| cv2.imwrite(str(emergency_dir / ef_name), results['annotated_frame']) |
| alert_type = ('CRITICAL' if has_critical else |
| 'WEAPON' if has_weapon else 'VIOLENCE') |
| emergency_frames.append({ |
| 'filename': ef_name, |
| 'frame_number': processed_frames, |
| 'timestamp_frame': frame_count, |
| 'alert_type': alert_type, |
| 'has_weapon': has_weapon, |
| 'has_violence': has_violence, |
| 'has_anomaly': False, |
| }) |
|
|
| |
| for alert in results['alerts']: |
| sev = alert.get('severity', 'LOW') |
| alert_entry = { |
| 'frame': processed_frames, |
| 'type': alert.get('type', 'unknown'), |
| 'severity': sev, |
| 'message': alert.get('message', ''), |
| 'confidence': float(alert.get('confidence', 0.0)), |
| } |
| job['alerts'].append(alert_entry) |
|
|
| if sev in ('HIGH', 'CRITICAL'): |
| job['high_risk_alerts'].append(alert_entry) |
|
|
| atype = alert.get('type', '') |
| if 'weapon' in atype: |
| det_type = 'weapon' |
| elif 'violence' in atype: |
| det_type = 'violence' |
| else: |
| det_type = 'risk' |
|
|
| |
| try: |
| uid8 = str(uuid.uuid4())[:8] |
| ts_str = datetime.now().strftime('%Y%m%d_%H%M%S') |
| det_filename = f"det_{det_type}_{ts_str}_{uid8}.jpg" |
| cv2.imwrite(str(det_dir / det_filename), results['annotated_frame']) |
|
|
| record = DetectionHistory( |
| user_id=user_id, |
| detection_type=det_type, |
| alert_level=sev, |
| confidence=float(alert.get('confidence', 0.0)), |
| image_filename=det_filename, |
| detection_details=json.dumps({ |
| 'alert_type': alert.get('type'), |
| 'message': alert.get('message'), |
| 'frame_number': processed_frames, |
| 'source': 'video_analysis', |
| }), |
| detected_at=datetime.now(), |
| session_date=date.today(), |
| ) |
| db.session.add(record) |
| db.session.commit() |
| except Exception: |
| db.session.rollback() |
|
|
| frame_count += 1 |
| job['current_frame'] = frame_count |
| if total_frames > 0: |
| job['progress'] = min(95, int((frame_count / total_frames) * 95)) |
|
|
| cap.release() |
| writer.release() |
|
|
| output_url = f"/processed/{processed_name}" if processed_frames else None |
| preview_url = f"/processed/{preview_name}" if processed_frames else None |
|
|
| risk_counts = {} |
| for s in frame_summaries: |
| risk_counts[s['pose_risk']] = risk_counts.get(s['pose_risk'], 0) + 1 |
|
|
| |
| |
| slim_summaries = [ |
| {k: v for k, v in s.items() if k not in ('detections', 'alerts')} |
| for s in frame_summaries |
| ] |
|
|
| results_data = { |
| 'total_frames': frame_count, |
| 'processed_frames': processed_frames, |
| 'detections': all_detections, |
| 'alerts': all_alerts, |
| 'frame_summaries': slim_summaries, |
| 'output_url': output_url, |
| 'output_mime': 'video/mp4', |
| 'preview_url': preview_url, |
| 'emergency_frames': emergency_frames, |
| 'summary': { |
| 'total_detections': len(all_detections), |
| 'total_alerts': len(all_alerts), |
| 'emergency_frames_count': len(emergency_frames), |
| 'weapon_alert_frames': sum(1 for f in frame_summaries if f['weapon_count'] > 0), |
| 'unsafe_pose_frames': sum(1 for f in frame_summaries if f['pose_risk'] in {'LOW_RISK', 'HIGH_RISK'}), |
| 'critical_frames': sum(1 for f in frame_summaries if f['alert_state'] == 'CRITICAL'), |
| 'pose_risk_counts': risk_counts, |
| }, |
| } |
|
|
| try: |
| history = AnalysisHistory( |
| user_id=user_id, |
| analysis_type='video', |
| filename=Path(filepath).name, |
| original_filename=original_filename, |
| detections=json.dumps(all_detections), |
| alert_count=len(all_alerts), |
| detection_count=len(all_detections), |
| models_used=json.dumps(selected_models), |
| processed_video_url=output_url, |
| preview_image_url=preview_url, |
| emergency_frames=json.dumps(emergency_frames), |
| total_frames=frame_count, |
| processed_frames=processed_frames, |
| frame_summaries=json.dumps(slim_summaries), |
| ) |
| db.session.add(history) |
| db.session.commit() |
| job['analysis_id'] = history.id |
| except Exception: |
| db.session.rollback() |
|
|
| |
| try: |
| json.dumps(results_data) |
| except TypeError: |
| results_data = json.loads(json.dumps(results_data, default=lambda o: float(o) if hasattr(o, '__float__') else str(o))) |
|
|
| job['results'] = results_data |
| job['status'] = 'done' |
| job['progress'] = 100 |
|
|
| except Exception as e: |
| job['status'] = 'error' |
| job['error'] = str(e) |
| try: |
| db.session.rollback() |
| except Exception: |
| pass |
|
|
|
|
| |
| def _migrate_db(): |
| """Add any missing columns to existing tables (safe for repeated runs).""" |
| from sqlalchemy import text |
| migrations = [ |
| "ALTER TABLE analysis_history ADD COLUMN original_filename VARCHAR(200)", |
| "ALTER TABLE analysis_history ADD COLUMN processed_video_url VARCHAR(500)", |
| "ALTER TABLE analysis_history ADD COLUMN preview_image_url VARCHAR(500)", |
| "ALTER TABLE analysis_history ADD COLUMN emergency_frames TEXT", |
| "ALTER TABLE analysis_history ADD COLUMN total_frames INTEGER DEFAULT 0", |
| "ALTER TABLE analysis_history ADD COLUMN processed_frames INTEGER DEFAULT 0", |
| "ALTER TABLE analysis_history ADD COLUMN frame_summaries TEXT", |
| ] |
| for stmt in migrations: |
| try: |
| db.session.execute(text(stmt)) |
| db.session.commit() |
| except Exception: |
| db.session.rollback() |
|
|
|
|
| with app.app_context(): |
| db.create_all() |
| _migrate_db() |
|
|
|
|
| if __name__ == '__main__': |
| import os |
| |
| |
| banner = """ |
| |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β β |
| β ββββ βββββββββββββββββββββββββββ ββββββ β |
| β βββββ ββββββββββββββββββββββββββββββββββββ β |
| β ββββββ βββββββββ βββ ββββββββββββββββ β |
| β ββββββββββββββββ βββ ββββββββββββββββ β |
| β βββ ββββββββββββββ βββ βββ ββββββ βββ β |
| β βββ βββββββββββββ βββ βββ ββββββ βββ β |
| β β |
| β AI-Powered Video Surveillance & Analysis System β |
| β β |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
| print(banner) |
| print("\n" + "β"*70) |
| print(f" π Available Models: {model_manager.list_models()}") |
| print("β"*70) |
| |
| |
| is_hf_spaces = os.getenv('SYSTEM') == 'spaces' or os.path.exists('/.dockerenv') |
| |
| if is_hf_spaces: |
| |
| host = '0.0.0.0' |
| port = 7860 |
| debug = False |
| use_reloader = False |
| print(f"\n π Running on Hugging Face Spaces") |
| else: |
| |
| host = '0.0.0.0' |
| port = int(os.getenv('PORT', 5001)) |
| debug = True |
| use_reloader = True |
| print(f"\n π» Running in Local Development Mode") |
| |
| print(f"\n π Access Application:") |
| print(f" β http://localhost:{port}") |
| print(f" β http://127.0.0.1:{port}") |
| print(f"\n βοΈ Configuration:") |
| print(f" β Debug Mode: {'ON' if debug else 'OFF'}") |
| print(f" β Host: {host} (all interfaces)") |
| print(f" β Port: {port}") |
| print(f" β Environment: {'Hugging Face Spaces' if is_hf_spaces else 'Local Development'}") |
| print(f"\n π Project Root: {PROJECT_ROOT}") |
| print("\n" + "β"*70) |
| print("\n β
Server is RUNNING") |
| print(" Press CTRL+C to stop the server\n") |
| print("β"*70 + "\n") |
| |
| app.run( |
| debug=debug, |
| host=host, |
| port=port, |
| threaded=True, |
| use_reloader=use_reloader |
| ) |