fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified | """ | |
| DetectifAI Flask Backend - AI-Powered CCTV Surveillance System with Database Integration | |
| Enhanced Flask API for: | |
| - Video upload and processing with DetectifAI security focus | |
| - Real-time processing status and results | |
| - Object detection with fire/weapon recognition | |
| - Security event analysis and threat assessment | |
| - Database integration with MongoDB and FAISS vector search | |
| - User authentication and authorization | |
| - Frontend integration for surveillance dashboard | |
| """ | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from uuid import uuid4 | |
| from flask import Flask, request, jsonify, send_file, send_from_directory, g | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| import threading | |
| import json | |
| import logging | |
| import jwt | |
| from dotenv import load_dotenv | |
| import numpy as np | |
| # Import DetectifAI components | |
| from main_pipeline import CompleteVideoProcessingPipeline | |
| from config import get_security_focused_config, VideoProcessingConfig | |
| # Import database components | |
| from pymongo import MongoClient | |
| from minio import Minio | |
| from minio.error import S3Error | |
| from vector_index import get_faiss_manager, generate_text_embedding, generate_visual_embedding | |
| # Try to import caption search (optional - may not be available) | |
| try: | |
| from caption_search import get_caption_search_engine | |
| CAPTION_SEARCH_AVAILABLE = True | |
| except ImportError as e: | |
| logger.warning(f"Caption search not available: {e}") | |
| CAPTION_SEARCH_AVAILABLE = False | |
| get_caption_search_engine = None | |
| # Try to import DetectifAI-specific components | |
| try: | |
| from detectifai_events import DetectifAIEventType, ThreatLevel | |
| DETECTIFAI_EVENTS_AVAILABLE = True | |
| except ImportError: | |
| DETECTIFAI_EVENTS_AVAILABLE = False | |
| logging.warning("DetectifAI events module not available - using basic functionality") | |
| # === Load Environment === | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT") | |
| MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY") | |
| MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY") | |
| MINIO_BUCKET = os.getenv("MINIO_BUCKET") | |
| JWT_SECRET = os.getenv("JWT_SECRET", "defaultsecret") | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/api/*": {"origins": "*"}}) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('logs/detectifai_api.log') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| UPLOAD_FOLDER = 'uploads' | |
| OUTPUT_FOLDER = 'video_processing_outputs' | |
| ALLOWED_EXTENSIONS = {'mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv'} | |
| MAX_CONTENT_LENGTH = 500 * 1024 * 1024 # 500MB max file size | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH | |
| # Create necessary directories | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs('logs', exist_ok=True) | |
| # === MongoDB Atlas Setup === | |
| mongo = MongoClient(MONGO_URI) | |
| db = mongo.get_default_database() | |
| # Collections from schema | |
| admin = db.admin | |
| user = db.users # Use 'users' to match database_setup.py | |
| users = db.users # Alias for clarity | |
| video_file = db.video_file | |
| event = db.event | |
| event_clip = db.event_clip | |
| detected_faces = db.detected_faces | |
| face_matches = db.face_matches | |
| event_description = db.event_description | |
| event_caption = db.event_caption | |
| query = db.query | |
| query_result = db.query_result | |
| subscription_plan = db.subscription_plan | |
| user_subscription = db.user_subscription | |
| # === MinIO Setup === | |
| minio_client = Minio( | |
| MINIO_ENDPOINT, | |
| access_key=MINIO_ACCESS_KEY, | |
| secret_key=MINIO_SECRET_KEY, | |
| secure=False | |
| ) | |
| try: | |
| if not minio_client.bucket_exists(MINIO_BUCKET): | |
| minio_client.make_bucket(MINIO_BUCKET) | |
| except S3Error as err: | |
| if err.code != "BucketAlreadyOwnedByYou" and err.code != "BucketAlreadyExists": | |
| raise | |
| # === FAISS Setup === | |
| faiss_manager = get_faiss_manager() | |
| # Store processing status in memory (use Redis in production) | |
| processing_status = {} | |
| # === Auth Helpers === | |
| def generate_jwt(user): | |
| payload = { | |
| "user_id": user["user_id"], | |
| "email": user["email"], | |
| "role": user.get("role", "user"), | |
| "exp": datetime.now(timezone.utc) + timedelta(hours=24) | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") | |
| def decode_jwt(token): | |
| try: | |
| return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) | |
| except jwt.ExpiredSignatureError: | |
| return None | |
| except jwt.InvalidTokenError: | |
| return None | |
| def auth_required(role=None): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if not token: | |
| return jsonify({"error": "missing token"}), 401 | |
| decoded = decode_jwt(token) | |
| if not decoded: | |
| return jsonify({"error": "invalid or expired token"}), 401 | |
| if role and decoded.get("role") != role: | |
| return jsonify({"error": "unauthorized"}), 403 | |
| g.user = decoded | |
| return func(*args, **kwargs) | |
| wrapper.__name__ = func.__name__ | |
| return wrapper | |
| return decorator | |
| def allowed_file(filename): | |
| """Check if file extension is allowed""" | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def extract_detectifai_results(pipeline_results): | |
| """Extract DetectifAI-specific results from pipeline output""" | |
| try: | |
| detectifai_results = { | |
| # Basic video metrics | |
| 'video_info': { | |
| 'total_keyframes': pipeline_results['outputs'].get('total_keyframes', 0), | |
| 'processing_time': pipeline_results['processing_stats'].get('total_processing_time', 0), | |
| 'output_directory': pipeline_results['outputs'].get('output_directory', '') | |
| }, | |
| # Security detection results | |
| 'security_detection': { | |
| 'total_object_detections': pipeline_results['outputs'].get('total_object_detections', 0), | |
| 'total_object_events': pipeline_results['outputs'].get('total_object_events', 0), | |
| 'detectifai_events': pipeline_results['outputs'].get('detectifai_events', 0), | |
| 'fire_detections': 0, # Will be populated from actual results | |
| 'weapon_detections': 0, | |
| 'security_alerts': [] | |
| }, | |
| # Event analysis | |
| 'event_analysis': { | |
| 'canonical_events': pipeline_results['outputs'].get('canonical_events', 0), | |
| 'total_motion_events': pipeline_results['outputs'].get('total_motion_events', 0), | |
| 'high_priority_events': 0, | |
| 'critical_events': 0 | |
| }, | |
| # Output files | |
| 'output_files': { | |
| 'keyframes_directory': os.path.join(pipeline_results['outputs'].get('output_directory', ''), 'frames'), | |
| 'reports': pipeline_results['outputs'].get('reports', {}), | |
| 'highlight_reels': pipeline_results['outputs'].get('highlight_reels', {}), | |
| 'compressed_video': pipeline_results['outputs'].get('compressed_video', '') | |
| }, | |
| # System performance | |
| 'performance': { | |
| 'frames_processed': pipeline_results['processing_stats'].get('frames_processed', 0), | |
| 'frames_enhanced': pipeline_results['processing_stats'].get('frames_enhanced', 0), | |
| 'gpu_acceleration': pipeline_results['processing_stats'].get('gpu_used', False) | |
| } | |
| } | |
| return detectifai_results | |
| except Exception as e: | |
| logger.error(f"Error extracting DetectifAI results: {e}") | |
| return {'error': 'Failed to extract results'} | |
| def process_video_async(video_id, video_path, config_type='detectifai', user_id=None): | |
| """Process video in background thread with DetectifAI focus and database integration""" | |
| try: | |
| processing_status[video_id]['status'] = 'processing' | |
| processing_status[video_id]['progress'] = 0 | |
| processing_status[video_id]['message'] = 'Initializing DetectifAI processing...' | |
| # Select configuration with DetectifAI optimizations | |
| if config_type == 'detectifai' or config_type == 'security': | |
| config = get_security_focused_config() | |
| # Removed robbery detection - using security focused config | |
| elif config_type == 'high_recall': | |
| try: | |
| from config import get_high_recall_config | |
| config = get_high_recall_config() | |
| except ImportError: | |
| config = get_security_focused_config() | |
| elif config_type == 'balanced': | |
| try: | |
| from config import get_balanced_config | |
| config = get_balanced_config() | |
| except ImportError: | |
| config = VideoProcessingConfig() | |
| else: | |
| config = VideoProcessingConfig() | |
| # DetectifAI-specific configuration enhancements | |
| config.enable_object_detection = True | |
| config.enable_facial_recognition = True | |
| config.keyframe_extraction_fps = 1.0 # Extract 1 frame per second for surveillance | |
| config.enable_adaptive_processing = True | |
| # Set custom output directory for this video | |
| config.output_base_dir = os.path.join(OUTPUT_FOLDER, video_id) | |
| # Initialize pipeline | |
| pipeline = CompleteVideoProcessingPipeline(config) | |
| # Update progress | |
| processing_status[video_id]['progress'] = 10 | |
| processing_status[video_id]['message'] = 'Extracting keyframes for security analysis...' | |
| # Process video with DetectifAI (with error tolerance) | |
| output_name = os.path.splitext(os.path.basename(video_path))[0] | |
| results = None | |
| processing_errors = [] | |
| try: | |
| results = pipeline.process_video_complete(video_path, output_name) | |
| logger.info(f"✅ Core pipeline processing completed for {video_id}") | |
| except Exception as pipeline_error: | |
| logger.error(f"⚠️ Pipeline error (but continuing): {str(pipeline_error)}") | |
| processing_errors.append(f"Pipeline: {str(pipeline_error)}") | |
| # Create minimal results structure | |
| results = { | |
| 'outputs': { | |
| 'total_keyframes': 0, | |
| 'total_events': 0, | |
| 'total_motion_events': 0, | |
| 'total_object_events': 0, | |
| 'total_object_detections': 0, | |
| 'canonical_events': [], | |
| 'total_segments': 1, | |
| 'highlight_reels': {}, | |
| 'reports': {}, | |
| 'compressed_video': '' | |
| }, | |
| 'processing_stats': {'total_processing_time': 0} | |
| } | |
| # Extract DetectifAI-specific results (with error tolerance) | |
| detectifai_results = {} | |
| try: | |
| detectifai_results = extract_detectifai_results(results) | |
| except Exception as extract_error: | |
| logger.error(f"⚠️ Result extraction error (but continuing): {str(extract_error)}") | |
| processing_errors.append(f"Extraction: {str(extract_error)}") | |
| detectifai_results = {'security_detection': {}, 'event_analysis': {}, 'performance': {}} | |
| # Store results in database | |
| try: | |
| # Update video file record with processing results | |
| video_file.update_one( | |
| {"video_id": video_id}, | |
| { | |
| "$set": { | |
| "processing_status": "completed", | |
| "processing_results": { | |
| "total_keyframes": results['outputs']['total_keyframes'], | |
| "total_events": results['outputs']['total_events'], | |
| "processing_time": results['processing_stats']['total_processing_time'], | |
| "detectifai_results": detectifai_results | |
| }, | |
| "updated_at": datetime.now(timezone.utc) | |
| } | |
| } | |
| ) | |
| # Create events in database | |
| for i, canonical_event in enumerate(results['outputs'].get('canonical_events', [])): | |
| event_doc = { | |
| "event_id": str(uuid4()), | |
| "video_id": video_id, | |
| "start_timestamp_ms": int(canonical_event.get('start_time', 0) * 1000), | |
| "end_timestamp_ms": int(canonical_event.get('end_time', 0) * 1000), | |
| "confidence_score": canonical_event.get('importance', 0.0), | |
| "is_verified": False, | |
| "is_false_positive": False, | |
| "verified_at": None, | |
| "verified_by": None, | |
| "visual_embedding": generate_visual_embedding(), | |
| "bounding_boxes": canonical_event.get('bounding_boxes', {}), | |
| "event_type": canonical_event.get('event_type', 'motion_detection') | |
| } | |
| event.insert_one(event_doc) | |
| # Add to FAISS index | |
| faiss_manager.add_visual_embedding(event_doc["event_id"], event_doc["visual_embedding"]) | |
| # Create event description | |
| description_doc = { | |
| "description_id": str(uuid4()), | |
| "event_id": event_doc["event_id"], | |
| "text_embedding": generate_text_embedding(f"Event {i+1}: {canonical_event.get('description', 'Motion detected')}"), | |
| "caption": canonical_event.get('description', f'Motion detected at {canonical_event.get("start_time", 0):.2f}s'), | |
| "confidence": canonical_event.get('importance', 0.0), | |
| "created_at": datetime.now(timezone.utc), | |
| "updated_at": datetime.now(timezone.utc) | |
| } | |
| event_description.insert_one(description_doc) | |
| # Add to FAISS text index | |
| faiss_manager.add_text_embedding(description_doc["description_id"], description_doc["text_embedding"]) | |
| logger.info(f"✅ Database integration completed for {video_id}") | |
| except Exception as db_error: | |
| logger.error(f"⚠️ Database integration error (but continuing): {str(db_error)}") | |
| processing_errors.append(f"Database: {str(db_error)}") | |
| # Always mark as completed (even with errors) | |
| processing_status[video_id]['status'] = 'completed' | |
| processing_status[video_id]['progress'] = 100 | |
| completion_message = 'DetectifAI processing completed successfully!' | |
| if processing_errors: | |
| completion_message = f'DetectifAI processing completed with warnings: {len(processing_errors)} non-critical errors' | |
| processing_status[video_id]['message'] = completion_message | |
| processing_status[video_id]['results'] = { | |
| # Original results for backward compatibility | |
| 'total_keyframes': results['outputs']['total_keyframes'], | |
| 'total_events': results['outputs']['total_events'], | |
| 'total_motion_events': results['outputs'].get('total_motion_events', 0), | |
| 'total_object_events': results['outputs'].get('total_object_events', 0), | |
| 'total_object_detections': results['outputs'].get('total_object_detections', 0), | |
| 'canonical_events': results['outputs']['canonical_events'], | |
| 'total_segments': results['outputs']['total_segments'], | |
| 'processing_time': results['processing_stats']['total_processing_time'], | |
| 'highlight_reels': results['outputs'].get('highlight_reels', {}), | |
| 'reports': results['outputs'].get('reports', {}), | |
| 'compressed_video': results['outputs'].get('compressed_video', ''), | |
| 'output_directory': config.output_base_dir, | |
| 'object_detection_enabled': config.enable_object_detection, | |
| # DetectifAI-specific results | |
| 'detectifai_results': detectifai_results, | |
| 'security_detection': detectifai_results.get('security_detection', {}), | |
| 'event_analysis': detectifai_results.get('event_analysis', {}), | |
| 'performance': detectifai_results.get('performance', {}), | |
| # Processing status | |
| 'processing_errors': processing_errors, | |
| 'has_warnings': len(processing_errors) > 0 | |
| } | |
| logger.info(f"Video {video_id} processed successfully") | |
| except Exception as e: | |
| logger.error(f"Error processing video {video_id}: {str(e)}") | |
| processing_status[video_id]['status'] = 'failed' | |
| processing_status[video_id]['message'] = f'Error: {str(e)}' | |
| processing_status[video_id]['error'] = str(e) | |
| # === API Endpoints === | |
| def index(): | |
| return jsonify({"message": "DetectifAI backend running with database integration"}) | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()}) | |
| # === Authentication Endpoints === | |
| def register(): | |
| data = request.json or {} | |
| email = data.get("email") | |
| password = data.get("password") | |
| username = data.get("username", email.split("@")[0] if email else None) | |
| if not email or not password: | |
| return jsonify({"error": "email and password required"}), 400 | |
| if user.find_one({"email": email}): | |
| return jsonify({"error": "email exists"}), 400 | |
| user_doc = { | |
| "user_id": str(uuid4()), | |
| "username": username, | |
| "email": email, | |
| "password": password, # TODO: hash properly | |
| "role": "user", | |
| "created_at": datetime.now(timezone.utc), | |
| "updated_at": datetime.now(timezone.utc), | |
| "last_login": None | |
| } | |
| user.insert_one(user_doc) | |
| token = generate_jwt(user_doc) | |
| return jsonify({"token": token}) | |
| def login(): | |
| if request.method == "OPTIONS": | |
| return '', 200 # Handle preflight CORS request | |
| data = request.json or {} | |
| email = data.get("email") | |
| password = data.get("password") | |
| if not email or not password: | |
| return jsonify({"error": "email and password required"}), 400 | |
| # Check against Mongo | |
| user_doc = user.find_one({"email": email}) | |
| if not user_doc or user_doc.get("password") != password: | |
| return jsonify({"error": "invalid credentials"}), 401 | |
| token = generate_jwt(user_doc) | |
| return jsonify({ | |
| "message": "login successful", | |
| "token": token, | |
| "user": { | |
| "user_id": user_doc["user_id"], | |
| "username": user_doc.get("username"), | |
| "email": user_doc["email"] | |
| } | |
| }) | |
| # === Admin User Management Endpoints === | |
| def get_all_users(): | |
| """Get all users - Admin only""" | |
| try: | |
| # Get query parameters for pagination and filtering | |
| page = int(request.args.get("page", 1)) | |
| limit = int(request.args.get("limit", 50)) | |
| search = request.args.get("search", "") | |
| role_filter = request.args.get("role", "") | |
| status_filter = request.args.get("status", "") | |
| # Build query | |
| query = {} | |
| if search: | |
| query["$or"] = [ | |
| {"email": {"$regex": search, "$options": "i"}}, | |
| {"username": {"$regex": search, "$options": "i"}} | |
| ] | |
| if role_filter: | |
| query["role"] = role_filter | |
| if status_filter: | |
| if status_filter == "active": | |
| query["is_active"] = True | |
| elif status_filter == "inactive": | |
| query["is_active"] = False | |
| # Get total count | |
| total = users.count_documents(query) | |
| # Get users with pagination | |
| skip = (page - 1) * limit | |
| user_list = list(users.find(query).skip(skip).limit(limit).sort("created_at", -1)) | |
| # Remove sensitive data | |
| for u in user_list: | |
| u["_id"] = str(u["_id"]) | |
| u.pop("password", None) | |
| u.pop("password_hash", None) | |
| return jsonify({ | |
| "users": user_list, | |
| "total": total, | |
| "page": page, | |
| "limit": limit, | |
| "pages": (total + limit - 1) // limit | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error fetching users: {str(e)}") | |
| return jsonify({"error": "Failed to fetch users"}), 500 | |
| def create_user(): | |
| """Create a new user - Admin only""" | |
| try: | |
| data = request.json or {} | |
| email = data.get("email") | |
| password = data.get("password") | |
| username = data.get("username") or data.get("name") | |
| role = data.get("role", "user") | |
| if not email or not password: | |
| return jsonify({"error": "email and password required"}), 400 | |
| # Check if user already exists | |
| if users.find_one({"email": email}): | |
| return jsonify({"error": "User with this email already exists"}), 400 | |
| # Create user document | |
| user_doc = { | |
| "user_id": str(uuid4()), | |
| "username": username or email.split("@")[0], | |
| "email": email, | |
| "password": password, # TODO: hash properly with bcrypt | |
| "password_hash": password, # For compatibility | |
| "role": role, | |
| "is_active": True, | |
| "profile_data": {}, | |
| "created_at": datetime.now(timezone.utc), | |
| "updated_at": datetime.now(timezone.utc), | |
| "last_login": None | |
| } | |
| users.insert_one(user_doc) | |
| # Remove sensitive data before returning | |
| user_doc["_id"] = str(user_doc["_id"]) | |
| user_doc.pop("password", None) | |
| user_doc.pop("password_hash", None) | |
| return jsonify({ | |
| "message": "User created successfully", | |
| "user": user_doc | |
| }), 201 | |
| except Exception as e: | |
| logger.error(f"Error creating user: {str(e)}") | |
| return jsonify({"error": "Failed to create user"}), 500 | |
| def get_user(user_id): | |
| """Get a specific user by ID - Admin only""" | |
| try: | |
| user_doc = users.find_one({"user_id": user_id}) | |
| if not user_doc: | |
| return jsonify({"error": "User not found"}), 404 | |
| # Remove sensitive data | |
| user_doc["_id"] = str(user_doc["_id"]) | |
| user_doc.pop("password", None) | |
| user_doc.pop("password_hash", None) | |
| return jsonify({"user": user_doc}) | |
| except Exception as e: | |
| logger.error(f"Error fetching user: {str(e)}") | |
| return jsonify({"error": "Failed to fetch user"}), 500 | |
| def update_user(user_id): | |
| """Update a user - Admin only""" | |
| try: | |
| data = request.json or {} | |
| user_doc = users.find_one({"user_id": user_id}) | |
| if not user_doc: | |
| return jsonify({"error": "User not found"}), 404 | |
| # Update allowed fields | |
| update_data = {} | |
| if "username" in data or "name" in data: | |
| update_data["username"] = data.get("username") or data.get("name") | |
| if "email" in data: | |
| # Check if new email already exists | |
| existing = users.find_one({"email": data["email"], "user_id": {"$ne": user_id}}) | |
| if existing: | |
| return jsonify({"error": "Email already in use"}), 400 | |
| update_data["email"] = data["email"] | |
| if "role" in data: | |
| update_data["role"] = data["role"] | |
| if "is_active" in data: | |
| update_data["is_active"] = data["is_active"] | |
| if "password" in data and data["password"]: | |
| update_data["password"] = data["password"] | |
| update_data["password_hash"] = data["password"] | |
| if not update_data: | |
| return jsonify({"error": "No valid fields to update"}), 400 | |
| update_data["updated_at"] = datetime.now(timezone.utc) | |
| users.update_one({"user_id": user_id}, {"$set": update_data}) | |
| # Fetch updated user | |
| updated_user = users.find_one({"user_id": user_id}) | |
| updated_user["_id"] = str(updated_user["_id"]) | |
| updated_user.pop("password", None) | |
| updated_user.pop("password_hash", None) | |
| return jsonify({ | |
| "message": "User updated successfully", | |
| "user": updated_user | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error updating user: {str(e)}") | |
| return jsonify({"error": "Failed to update user"}), 500 | |
| def delete_user(user_id): | |
| """Delete a user - Admin only""" | |
| try: | |
| user_doc = users.find_one({"user_id": user_id}) | |
| if not user_doc: | |
| return jsonify({"error": "User not found"}), 404 | |
| # Prevent deleting yourself | |
| current_user = g.user | |
| if current_user.get("user_id") == user_id: | |
| return jsonify({"error": "Cannot delete your own account"}), 400 | |
| users.delete_one({"user_id": user_id}) | |
| return jsonify({"message": "User deleted successfully"}) | |
| except Exception as e: | |
| logger.error(f"Error deleting user: {str(e)}") | |
| return jsonify({"error": "Failed to delete user"}), 500 | |
| # === Video Processing Endpoints === | |
| def upload_video(): | |
| """Upload video endpoint with database integration""" | |
| try: | |
| # Check if file is present | |
| if 'video' not in request.files: | |
| return jsonify({'error': 'No video file provided'}), 400 | |
| file = request.files['video'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| if not allowed_file(file.filename): | |
| return jsonify({'error': 'Invalid file type. Allowed: mp4, avi, mov, mkv, wmv, flv'}), 400 | |
| # Get processing configuration (default to DetectifAI optimized) | |
| config_type = request.form.get('config_type', 'detectifai') | |
| # Generate unique video ID | |
| video_id = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{os.urandom(4).hex()}" | |
| # Save uploaded file | |
| filename = secure_filename(file.filename) | |
| video_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{video_id}_{filename}") | |
| file.save(video_path) | |
| # Get file size | |
| file.seek(0, os.SEEK_END) | |
| file_size = file.tell() | |
| file.seek(0) | |
| # Store in MinIO using standardized paths | |
| from minio_config import VIDEOS_BUCKET, get_minio_paths | |
| minio_paths = get_minio_paths(video_id, filename) | |
| object_name = minio_paths["original"] | |
| try: | |
| with open(video_path, 'rb') as file_data: | |
| minio_client.put_object( | |
| VIDEOS_BUCKET, | |
| object_name, | |
| file_data, | |
| file_size, | |
| content_type='video/mp4' | |
| ) | |
| logger.info(f"✅ Video uploaded to MinIO: {object_name}") | |
| except Exception as e: | |
| logger.error(f"❌ MinIO upload failed: {e}") | |
| raise | |
| # Create video record in database | |
| video_doc = { | |
| "video_id": video_id, | |
| "user_id": g.user.get("user_id"), | |
| "file_path": video_path, | |
| "minio_object_key": object_name, | |
| "minio_bucket": MINIO_BUCKET, | |
| "codec": None, | |
| "fps": None, | |
| "upload_date": datetime.now(timezone.utc), | |
| "duration_secs": None, | |
| "file_size_bytes": file_size, | |
| "meta_data": {}, | |
| "processing_status": "uploaded" | |
| } | |
| video_file.insert_one(video_doc) | |
| # Initialize processing status | |
| processing_status[video_id] = { | |
| 'video_id': video_id, | |
| 'filename': filename, | |
| 'status': 'queued', | |
| 'progress': 0, | |
| 'message': 'Video uploaded successfully. Processing queued.', | |
| 'uploaded_at': datetime.now().isoformat(), | |
| 'config_type': config_type | |
| } | |
| # Start background processing | |
| thread = threading.Thread( | |
| target=process_video_async, | |
| args=(video_id, video_path, config_type, g.user.get("user_id")) | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| return jsonify({ | |
| 'success': True, | |
| 'video_id': video_id, | |
| 'message': 'Video uploaded successfully. Processing started.', | |
| 'status_url': f'/api/status/{video_id}' | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Upload error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def get_status(video_id): | |
| """Get processing status for a video""" | |
| # Check memory first | |
| if video_id in processing_status: | |
| return jsonify(processing_status[video_id]), 200 | |
| # Check database for video record | |
| video_doc = video_file.find_one({"video_id": video_id}) | |
| if video_doc: | |
| status = { | |
| 'video_id': video_id, | |
| 'filename': video_doc.get('file_path', '').split('/')[-1], | |
| 'status': video_doc.get('processing_status', 'unknown'), | |
| 'progress': 100 if video_doc.get('processing_status') == 'completed' else 0, | |
| 'message': f"Video status: {video_doc.get('processing_status', 'unknown')}", | |
| 'uploaded_at': video_doc.get('upload_date', '').isoformat() if video_doc.get('upload_date') else '', | |
| 'results': video_doc.get('processing_results', {}) | |
| } | |
| return jsonify(status), 200 | |
| return jsonify({'error': 'Video not found'}), 404 | |
| # === Database Query Endpoints === | |
| def list_videos(): | |
| """List all videos for the authenticated user""" | |
| user_id = g.user.get("user_id") | |
| vids = list(video_file.find({"user_id": user_id}, {"_id": 0})) | |
| return jsonify(vids) | |
| def get_video(video_id): | |
| """Get specific video details""" | |
| user_id = g.user.get("user_id") | |
| vid = video_file.find_one({"video_id": video_id, "user_id": user_id}, {"_id": 0}) | |
| if not vid: | |
| return jsonify({"error": "not found"}), 404 | |
| return jsonify(vid) | |
| def get_video_events(video_id): | |
| """Get events for a specific video""" | |
| user_id = g.user.get("user_id") | |
| # Verify user owns the video | |
| video_doc = video_file.find_one({"video_id": video_id, "user_id": user_id}) | |
| if not video_doc: | |
| return jsonify({"error": "video not found or access denied"}), 404 | |
| events_list = list(event.find({"video_id": video_id}, {"_id": 0})) | |
| return jsonify(events_list) | |
| def get_event_details(event_id): | |
| """Get event details with descriptions""" | |
| event_doc = event.find_one({"event_id": event_id}, {"_id": 0}) | |
| if not event_doc: | |
| return jsonify({"error": "event not found"}), 404 | |
| # Get descriptions for this event | |
| descriptions = list(event_description.find({"event_id": event_id}, {"_id": 0})) | |
| event_doc["descriptions"] = descriptions | |
| return jsonify(event_doc) | |
| # === Search Endpoints === | |
| def search(): | |
| """Simple text search in event descriptions""" | |
| q = request.args.get("q", "") | |
| user_id = g.user.get("user_id") | |
| # Get user's videos first | |
| user_videos = [v["video_id"] for v in video_file.find({"user_id": user_id}, {"video_id": 1})] | |
| # Search in descriptions for user's videos | |
| matches = list(event_description.find({ | |
| "caption": {"$regex": q, "$options": "i"}, | |
| "event_id": {"$in": [e["event_id"] for e in event.find({"video_id": {"$in": user_videos}}, {"event_id": 1})]} | |
| }, {"_id": 0})) | |
| return jsonify(matches) | |
| def search_vector(): | |
| """Vector search for similar text embeddings using FAISS""" | |
| data = request.json or {} | |
| query_text = data.get("query_text") | |
| k = data.get("k", 10) # Number of results to return | |
| if not query_text: | |
| return jsonify({"error": "query_text is required"}), 400 | |
| try: | |
| # Generate embedding for the query text | |
| query_embedding = generate_text_embedding(query_text) | |
| # Search FAISS index | |
| results = faiss_manager.search_text_embeddings(query_embedding, k) | |
| return jsonify({ | |
| "query_text": query_text, | |
| "results": results, | |
| "total_results": len(results) | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": f"Search failed: {str(e)}"}), 500 | |
| def search_visual(): | |
| """Vector search for similar visual embeddings using FAISS""" | |
| data = request.json or {} | |
| query_embedding = data.get("query_embedding") | |
| k = data.get("k", 10) # Number of results to return | |
| if not query_embedding: | |
| return jsonify({"error": "query_embedding is required"}), 400 | |
| if not isinstance(query_embedding, list): | |
| return jsonify({"error": "query_embedding must be a list of floats"}), 400 | |
| try: | |
| # Search FAISS index | |
| results = faiss_manager.search_visual_embeddings(query_embedding, k) | |
| return jsonify({ | |
| "query_embedding_dim": len(query_embedding), | |
| "results": results, | |
| "total_results": len(results) | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": f"Visual search failed: {str(e)}"}), 500 | |
| def search_captions(): | |
| """Search captions using FAISS index and sentence transformers""" | |
| try: | |
| if not CAPTION_SEARCH_AVAILABLE: | |
| return jsonify({ | |
| "error": "Caption search not available", | |
| "message": "Caption search module not installed or not available" | |
| }), 503 | |
| data = request.json or {} | |
| query_text = data.get("query", "").strip() | |
| top_k = data.get("top_k", 10) | |
| min_score = data.get("min_score", 0.0) | |
| if not query_text: | |
| return jsonify({"error": "query is required"}), 400 | |
| # Get caption search engine | |
| search_engine = get_caption_search_engine() | |
| if not search_engine or not search_engine.is_ready(): | |
| return jsonify({ | |
| "error": "Caption search engine not ready", | |
| "stats": search_engine.get_stats() if search_engine else {} | |
| }), 503 | |
| # Perform search | |
| results = search_engine.search(query_text, top_k=top_k, min_score=min_score) | |
| # Format results for frontend | |
| formatted_results = [] | |
| for result in results: | |
| video_ref = result.get("video_reference", {}) | |
| minio_path = video_ref.get("minio_path", "") | |
| object_name = video_ref.get("object_name", "") | |
| # Generate MinIO URL for the image/video | |
| image_url = None | |
| if object_name: | |
| try: | |
| bucket = video_ref.get("bucket", "nlp-images") | |
| # Create bucket if it doesn't exist | |
| try: | |
| if not minio_client.bucket_exists(bucket): | |
| logger.info(f"Creating MinIO bucket: {bucket}") | |
| minio_client.make_bucket(bucket) | |
| except S3Error as e: | |
| if e.code != "BucketAlreadyOwnedByYou" and e.code != "BucketAlreadyExists": | |
| logger.warning(f"Could not create bucket {bucket}: {e}") | |
| # Generate presigned URL for MinIO object (valid for 1 hour) | |
| from datetime import timedelta | |
| image_url = minio_client.presigned_get_object( | |
| bucket, | |
| object_name, | |
| expires=timedelta(hours=1) | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Could not generate MinIO URL: {e}") | |
| # Fallback: use unified image serving endpoint | |
| bucket = video_ref.get("bucket", "nlp-images") | |
| image_url = f"/api/minio/image/{bucket}/{object_name}" | |
| formatted_result = { | |
| "id": result.get("description_id"), | |
| "event_id": result.get("event_id"), | |
| "description": result.get("caption", ""), | |
| "caption": result.get("caption", ""), | |
| "confidence": result.get("confidence", 0.0), | |
| "similarity_score": result.get("similarity_score", 0.0), | |
| "thumbnail": image_url, | |
| "video_reference": video_ref, | |
| "timestamp": result.get("created_at"), | |
| "zone": "N/A" # Can be enhanced with actual zone data | |
| } | |
| formatted_results.append(formatted_result) | |
| return jsonify({ | |
| "query": query_text, | |
| "results": formatted_results, | |
| "total_results": len(formatted_results), | |
| "stats": search_engine.get_stats() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in caption search: {e}") | |
| return jsonify({"error": f"Search failed: {str(e)}"}), 500 | |
| # === FAISS Management Endpoints === | |
| def rebuild_indices(): | |
| """Rebuild FAISS indices from MongoDB data""" | |
| try: | |
| # Rebuild both indices | |
| faiss_manager.rebuild_text_index() | |
| faiss_manager.rebuild_visual_index() | |
| # Get updated stats | |
| stats = faiss_manager.get_index_stats() | |
| return jsonify({ | |
| "message": "Indices rebuilt successfully", | |
| "stats": stats | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": f"Failed to rebuild indices: {str(e)}"}), 500 | |
| def get_index_stats(): | |
| """Get statistics about FAISS indices""" | |
| try: | |
| stats = faiss_manager.get_index_stats() | |
| return jsonify(stats) | |
| except Exception as e: | |
| return jsonify({"error": f"Failed to get index stats: {str(e)}"}), 500 | |
| # === Legacy DetectifAI Endpoints (for backward compatibility) === | |
| def get_results(video_id): | |
| """Get processing results for a video""" | |
| if video_id not in processing_status: | |
| return jsonify({'error': 'Video not found'}), 404 | |
| status = processing_status[video_id] | |
| if status['status'] != 'completed': | |
| return jsonify({ | |
| 'error': 'Processing not completed', | |
| 'current_status': status['status'] | |
| }), 400 | |
| return jsonify(status.get('results', {})), 200 | |
| def get_video_results(video_id): | |
| """Get video processing results with availability flags""" | |
| # First check if video is in memory status | |
| if video_id in processing_status: | |
| status = processing_status[video_id] | |
| if status['status'] != 'completed': | |
| return jsonify({ | |
| 'error': 'Processing not completed', | |
| 'current_status': status['status'] | |
| }), 400 | |
| # Check if status has results structure (normal processing) | |
| if 'results' in status and 'output_directory' in status['results']: | |
| output_dir = status['results']['output_directory'] | |
| else: | |
| # Fallback to standard directory structure | |
| output_dir = os.path.join('video_processing_outputs', video_id) | |
| else: | |
| # Check database for video record | |
| video_doc = video_file.find_one({"video_id": video_id}) | |
| if not video_doc: | |
| return jsonify({'error': 'Video not found'}), 404 | |
| output_dir = os.path.join('video_processing_outputs', video_id) | |
| if not os.path.exists(output_dir): | |
| return jsonify({'error': 'Video processing results not found'}), 404 | |
| logger.info(f"📁 Found video files on disk for {video_id}, recovering results") | |
| # Check for compressed video | |
| compressed_dir = os.path.join(output_dir, 'compressed') | |
| compressed_video_available = False | |
| compressed_video_url = None | |
| if os.path.exists(compressed_dir): | |
| video_files = [f for f in os.listdir(compressed_dir) if f.endswith('.mp4')] | |
| if video_files: | |
| compressed_video_available = True | |
| compressed_video_url = f'/api/video/compressed/{video_id}' | |
| # Check for keyframes | |
| frames_dir = os.path.join(output_dir, 'frames') | |
| keyframes_available = os.path.exists(frames_dir) and len([f for f in os.listdir(frames_dir) if f.endswith('.jpg')]) > 0 | |
| keyframes_count = len([f for f in os.listdir(frames_dir) if f.endswith('.jpg')]) if keyframes_available else 0 | |
| # Check for reports | |
| reports_dir = os.path.join(output_dir, 'reports') | |
| reports_available = os.path.exists(reports_dir) | |
| report_files = [] | |
| if reports_available: | |
| report_files = [f for f in os.listdir(reports_dir) if f.endswith('.json')] | |
| return jsonify({ | |
| 'video_id': video_id, | |
| 'compressed_video_available': compressed_video_available, | |
| 'compressed_video_url': compressed_video_url, | |
| 'keyframes_available': keyframes_available, | |
| 'keyframes_count': keyframes_count, | |
| 'keyframes_url': f'/api/video/keyframes/{video_id}', | |
| 'reports_available': reports_available, | |
| 'reports': report_files | |
| }), 200 | |
| # === File Serving Endpoints === | |
| def get_keyframes(video_id): | |
| """Get list of extracted keyframes with DetectifAI annotations""" | |
| if video_id not in processing_status: | |
| return jsonify({'error': 'Video not found'}), 404 | |
| status = processing_status[video_id] | |
| if status['status'] != 'completed': | |
| return jsonify({'error': 'Processing not completed'}), 400 | |
| output_dir = status['results']['output_directory'] | |
| frames_dir = os.path.join(output_dir, 'frames') | |
| if not os.path.exists(frames_dir): | |
| return jsonify({'error': 'Frames directory not found'}), 404 | |
| # Load detection metadata if available | |
| detection_metadata = {} | |
| detection_metadata_path = os.path.join(output_dir, 'detection_metadata.json') | |
| if os.path.exists(detection_metadata_path): | |
| try: | |
| with open(detection_metadata_path, 'r') as f: | |
| detection_metadata = json.load(f) | |
| except Exception as e: | |
| logger.warning(f"Could not load detection metadata: {e}") | |
| # Get filter parameter | |
| filter_detections = request.args.get('filter_detections', 'false').lower() == 'true' | |
| keyframes = [] | |
| frames_with_detections = {item['original_path']: item for item in detection_metadata.get('detection_summary', [])} | |
| for filename in sorted(os.listdir(frames_dir)): | |
| if filename.endswith('.jpg') and not filename.endswith('_annotated.jpg'): | |
| # Extract timestamp from filename | |
| timestamp = 0.0 | |
| try: | |
| if '_' in filename: | |
| timestamp_part = filename.split('_')[1].replace('s', '').replace('.jpg', '') | |
| timestamp = float(timestamp_part) | |
| except: | |
| pass | |
| frame_path = os.path.join(frames_dir, filename) | |
| has_detections = frame_path in frames_with_detections | |
| # Skip frames without detections if filtering is enabled | |
| if filter_detections and not has_detections: | |
| continue | |
| keyframe_data = { | |
| 'filename': filename, | |
| 'timestamp': timestamp, | |
| 'url': f'/api/keyframe/{video_id}/{filename}', | |
| 'has_detections': has_detections | |
| } | |
| # Add detection details if available | |
| if has_detections: | |
| detection_info = frames_with_detections[frame_path] | |
| keyframe_data.update({ | |
| 'detection_count': detection_info.get('detection_count', 0), | |
| 'objects': detection_info.get('objects', []), | |
| 'confidence_avg': detection_info.get('confidence_avg', 0.0) | |
| }) | |
| keyframes.append(keyframe_data) | |
| return jsonify({ | |
| 'video_id': video_id, | |
| 'total_keyframes': detection_metadata.get('total_keyframes', len(keyframes)), | |
| 'keyframes_with_detections': detection_metadata.get('frames_with_detections', 0), | |
| 'keyframes': keyframes, | |
| 'objects_detected': detection_metadata.get('objects_detected', {}), | |
| 'filter_applied': filter_detections | |
| }), 200 | |
| def get_keyframe_image(video_id, filename): | |
| """Serve keyframe image""" | |
| if video_id not in processing_status: | |
| return jsonify({'error': 'Video not found'}), 404 | |
| status = processing_status[video_id] | |
| output_dir = status['results']['output_directory'] | |
| frames_dir = os.path.join(output_dir, 'frames') | |
| return send_from_directory(frames_dir, filename) | |
| def get_compressed_video(video_id): | |
| """Serve compressed video""" | |
| if video_id not in processing_status: | |
| return jsonify({'error': 'Video not found'}), 404 | |
| status = processing_status[video_id] | |
| if status['status'] != 'completed': | |
| return jsonify({'error': 'Processing not completed'}), 400 | |
| output_dir = status['results']['output_directory'] | |
| compressed_dir = os.path.join(output_dir, 'compressed') | |
| if not os.path.exists(compressed_dir): | |
| return jsonify({'error': 'Compressed video directory not found'}), 404 | |
| # Find the compressed video file | |
| video_files = [f for f in os.listdir(compressed_dir) if f.endswith('.mp4')] | |
| if not video_files: | |
| return jsonify({'error': 'Compressed video file not found'}), 404 | |
| # Use the first video file found (should only be one) | |
| video_filename = video_files[0] | |
| return send_from_directory(compressed_dir, video_filename) | |
| if __name__ == '__main__': | |
| logger.info("Starting DetectifAI Flask API server with database integration...") | |
| app.run(host='0.0.0.0', port=5000, debug=True) | |