import os import cv2 import torch import numpy as np import time from datetime import datetime import threading import base64 from werkzeug.utils import secure_filename # --- Load .env file --- from dotenv import load_dotenv load_dotenv() # --- END --- from flask import Flask, render_template, Response, request, jsonify from flask_socketio import SocketIO # Important: Adjust imports if your structure changed from utils.load_model import load_models from utils.utils import build_transforms from network.TorchUtils import get_torch_device from yolo_detection import analyze_video_with_yolo from video_sumrrizer import summarize_video # Your summarizer # ---- App Setup ---- app = Flask(__name__) app.config['SECRET_KEY'] = 'your_secret_key!' UPLOAD_FOLDER = 'uploads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER socketio = SocketIO(app, async_mode='eventlet') # ---- Global Config & Model Loading ---- print("[INFO] Loading models...") DEVICE = get_torch_device() FEATURE_EXTRACTOR_PATH = "models/c3d.pickle" AD_MODEL_PATH = "models/epoch_80000.pt" YOLO_MODEL_PATH = "models/yolo_my_model.pt" SAVE_DIR = "outputs/anomaly_frames" ANOMALY_THRESHOLD = 0.4 COOLDOWN_SECS = 60.0 # <--- This enforces the 60 second anomaly cooldown os.makedirs(SAVE_DIR, exist_ok=True) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: print("โš ๏ธ WARNING: OPENAI_API_KEY not set!") anomaly_detector, feature_extractor = load_models( FEATURE_EXTRACTOR_PATH, AD_MODEL_PATH, features_method="c3d", device=DEVICE) feature_extractor.eval(); anomaly_detector.eval() TRANSFORMS = build_transforms(mode="c3d") print("[INFO] Models loaded successfully.") VIDEO_PATHS = { # Ensure these paths are correct relative to app.py "Abuse": "static/videos/Abuse.mp4", "Arrest": "static/videos/Arrest.mp4", "Arson": "static/videos/Arson.mp4", "Assault": "static/videos/Assault.mp4", "Burglary": "static/videos/Burglary.mp4", "Explosion": "static/videos/Explosion.mp4", "Fighting": "static/videos/Fighting.mp4", "RoadAccidents": "static/videos/RoadAccidents.mp4", "Robbery": "static/videos/Robbery.mp4", "Shooting": "static/videos/Shooting.mp4", "Shoplifting": "static/videos/Shoplifting.mp4", "Stealing": "static/videos/Stealing.mp4", "Vandalism": "static/videos/Vandalism.mp4", "Normal": "static/videos/Normal.mp4" } # --- Thread control --- thread = None; thread_lock = threading.Lock(); stop_event = threading.Event() def smooth_score(scores, new_score, window=5): scores.append(new_score); scores = scores[-window:]; return float(np.mean(scores)) # --- MODIFIED: Renamed and removed internal wait --- def _save_clip_and_start_summarizer(video_path, clip_dir, initial_frames, fps, width, height): """ Saves a 30s clip, then immediately starts the summarizer task. """ out_path = os.path.join(clip_dir, "anomaly_clip.mp4") save_success = False try: socketio.emit('update_status', {'status': 'Saving 30s clip...'}) cap2 = cv2.VideoCapture(video_path) if not cap2.isOpened(): raise Exception(f"Cannot open video {video_path}") fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) for f_idx, f in enumerate(initial_frames): out.write(f) if f_idx % 5 == 0: socketio.sleep(0.001) frames_to_save = int(fps * 30) remaining = frames_to_save - len(initial_frames) for i in range(remaining): ret, frame = cap2.read() if not ret: break out.write(frame) if i % 5 == 0: socketio.sleep(0.001) out.release(); cap2.release() print(f"โœ… [INFO] Saved 30-sec clip -> {out_path}") save_success = True except Exception as e: print(f"โŒ [ERROR] _save_clip failed: {e}") socketio.emit('update_summary', {'summary': f"Error saving clip: {e}"}) socketio.emit('recording_signal', {'recording': False}) finally: # --- If saving succeeded, start summarizer immediately --- if save_success: try: print("๐Ÿš€ Starting background task for summarization...") socketio.emit('update_status', {'status': 'Clip saved. Starting summarizer...'}) # Start summarizer in a *new* background task socketio.start_background_task(_run_summarizer, out_path) except Exception as e: print(f"โŒ [ERROR] Failed to start summarizer task: {e}") socketio.emit('update_summary', {'summary': f"Error starting summarizer: {e}"}) socketio.emit('recording_signal', {'recording': False}) # --- End --- # --- Background Task 2 - Just Run Summarizer (Unchanged) --- def _run_summarizer(saved_clip_path): """ Runs the summarizer on the already saved clip. Emits results and recording=False. """ summary_text = None try: print(f"โณ [INFO] Summarizing clip: {saved_clip_path}") socketio.emit('update_status', {'status': 'Summarizing clip...'}) socketio.sleep(0.01) # Yield before blocking call api_key_from_env = os.getenv("OPENAI_API_KEY") if not api_key_from_env: raise ValueError("OPENAI_API_KEY missing.") summary_text = summarize_video(video_path=saved_clip_path, api=api_key_from_env) print("\nโœ… VIDEO SUMMARY (snippet):\n", summary_text[:100] + "...", "\n") except Exception as e: summary_text = f"Summarizer Error: {e}" print(f"โŒ [ERROR] _run_summarizer failed: {e}") finally: if summary_text is None: summary_text = "Summarizer Error: Unknown failure." socketio.emit('update_summary', {'summary': summary_text}) socketio.emit('recording_signal', {'recording': False}) # Signal recording finished print("โ„น๏ธ Summarization processing finished.") # --- Main video processing loop (Unchanged from previous fix) --- def video_processing_task(video_path): global thread try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video file") frame_buffer, last_save_time, recent_scores = [], 0.0, [] FRAME_SKIP = 6 # Keep increased frame skipping frame_count = 0 fps, width, height = (cap.get(cv2.CAP_PROP_FPS) or 25.0, int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) while cap.isOpened() and not stop_event.is_set(): socketio.sleep(0.002) # Keep yielding ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % (FRAME_SKIP + 1) != 0: continue # Apply skipping frame_buffer.append(frame.copy()) if len(frame_buffer) == 16: socketio.sleep(0.001) # Yield before inference # --- Anomaly Detection --- frames_resized = [cv2.resize(f, (112, 112)) for f in frame_buffer] clip_np = np.array(frames_resized, dtype=np.uint8) clip_torch = torch.from_numpy(clip_np) clip_torch = TRANSFORMS(clip_torch).unsqueeze(0).to(DEVICE) with torch.no_grad(): features = feature_extractor(clip_torch).detach() score = float(anomaly_detector(features).detach().item()) score = smooth_score(recent_scores, score) score = float(np.clip(score, 0, 1)) socketio.emit('update_graph', {'score': score}) # --- End Anomaly Detection --- # --- Anomaly Actions --- now = time.time() # --- This check enforces the 60 second cooldown --- if score > ANOMALY_THRESHOLD and (now - last_save_time) >= COOLDOWN_SECS: print(f"๐Ÿ”ฅ Anomaly! Score: {score:.2f}") last_save_time = now # Reset cooldown timer socketio.emit('recording_signal', {'recording': True}) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") clip_dir = os.path.join(SAVE_DIR, f"anomaly_{timestamp}") os.makedirs(clip_dir, exist_ok=True) # Quick YOLO yolo_frame_path = os.path.join(clip_dir, "yolo_frame.jpg") preview_frame_img = frame_buffer[-1].copy() cv2.imwrite(yolo_frame_path, preview_frame_img) try: yolo_result = analyze_video_with_yolo(yolo_frame_path, model_path=YOLO_MODEL_PATH, return_class=True) yolo_text = f"๐Ÿšจ Anomaly โ†’ YOLO: {yolo_result}" except Exception as e: yolo_text = f"YOLO Error: {e}" socketio.emit('update_yolo_text', {'text': yolo_text}) _, buffer = cv2.imencode('.jpg', preview_frame_img) b64_str = base64.b64encode(buffer).decode('utf-8') socketio.emit('update_yolo_image', {'image_data': b64_str}) socketio.emit('update_summary', {'summary': 'loading'}) socketio.sleep(0.005) # Yield before starting save task # Start the saving task (which will later schedule summarizer) print("๐Ÿš€ Starting background task for clip saving...") socketio.start_background_task( _save_clip_and_start_summarizer, # Use the modified function video_path, clip_dir, frame_buffer.copy(), fps, width, height ) # --- End anomaly actions --- frame_buffer.clear() cap.release() if not stop_event.is_set(): socketio.emit('processing_finished', {'message': 'Video finished.'}) print("๐Ÿ Video processing task ended.") except Exception as e: print(f"โŒ [ERROR] Unhandled exception in video_processing_task: {e}") socketio.emit('processing_error', {'error': f'Runtime error: {e}'}) finally: with thread_lock: if thread is not None: print("๐Ÿงน Cleaning up video processing task.") thread = None; stop_event.clear() # --- (Routes: /, /upload, /video_stream remain the same) --- @app.route('/') def index(): return render_template('index.html', anomaly_names=VIDEO_PATHS.keys()) @app.route('/upload', methods=['POST']) def upload_file(): if 'video' not in request.files: return jsonify({'error': 'No video file'}), 400 file = request.files['video']; name = file.filename if name == '': return jsonify({'error': 'No video selected'}), 400 filename = secure_filename(name); ts = datetime.now().strftime('%Y%m%d%H%M%S') unique_name = f"{ts}_{filename}"; path = os.path.join(app.config['UPLOAD_FOLDER'], unique_name) try: file.save(path); print(f"โœ… Uploaded: {path}"); return jsonify({'success': True, 'filename': unique_name}) except Exception as e: print(f"โŒ Upload failed: {e}"); return jsonify({'error': f'{e}'}), 500 @app.route('/video_stream//') def video_stream(source, filename): path, safe_name = None, secure_filename(filename) if source == 'demo': path = VIDEO_PATHS.get(filename) elif source == 'upload': path = os.path.join(app.config['UPLOAD_FOLDER'], safe_name) else: return "Invalid source", 404 if not path or not os.path.exists(path): return "Video not found", 404 def generate(): try: with open(path, "rb") as f: while chunk := f.read(1024*1024): yield chunk; socketio.sleep(0.0001) except Exception as e: print(f"โŒ Streaming error: {e}") print(f"โ„น๏ธ Streaming: {path}"); return Response(generate(), mimetype="video/mp4") # --- (SocketIO handlers: start_processing, reset_system remain the same) --- @socketio.on('start_processing') def handle_start_processing(data): global thread with thread_lock: if thread is None: stop_event.clear(); source, name = data.get('source'), data.get('filename') path, safe_name = None, secure_filename(name) if name else None if not safe_name: return socketio.emit('processing_error', {'error': 'Invalid filename.'}) if source == 'demo': path = VIDEO_PATHS.get(name) elif source == 'upload': path = os.path.join(app.config['UPLOAD_FOLDER'], safe_name) if path and os.path.exists(path): print(f"๐Ÿš€ Starting processing: '{safe_name}' ({source})") thread = socketio.start_background_task(video_processing_task, path) else: socketio.emit('processing_error', {'error': 'Video file not found.'}) else: socketio.emit('update_status', {'status': 'Processing already running.'}) @socketio.on('reset_system') def handle_reset(): global thread with thread_lock: if thread: print("๐Ÿ”„ Reset requested. Stopping..."); stop_event.set() else: print("โ„น๏ธ Reset requested, none running.") socketio.emit('system_reset_confirm'); print("โœ… Reset confirmed.") if __name__ == '__main__': print("๐Ÿ Starting Flask server...") socketio.run(app, debug=True,host='0.0.0.0', port=5000)