from flask import Flask, render_template, Response, jsonify, request from flask import Response as FlaskResponse import os import sys from dotenv import load_dotenv # STOP OpenCV hardware warnings at the source os.environ["OPENCV_LOG_LEVEL"] = "OFF" os.environ["YOLO_CONFIG_DIR"] = "/tmp" import cv2 import time import numpy as np import threading import requests from flask import Flask, render_template, Response, request, jsonify from flask import Response as FlaskResponse from twilio.rest import Client from playsound import playsound from ultralytics import YOLO from collections import Counter from datetime import datetime from geopy.geocoders import Nominatim import base64 import io # Load environment variables load_dotenv() app = Flask(__name__) # --- Camera Initialization --- CAMERA_SOURCE = os.getenv("CAMERA_SOURCE", "https://improvisatory-armandina-nonsuppressed.ngrok-free.dev/video") print(f"[INFO] Server starting... Checking camera source: {CAMERA_SOURCE}") camera = cv2.VideoCapture(CAMERA_SOURCE) if not camera.isOpened(): print(f"[WARNING] Could not open ngrok source: {CAMERA_SOURCE}") print("[INFO] Falling back to local webcam (ID 0)...") camera = cv2.VideoCapture(0) if not camera.isOpened(): print("[ERROR] Could not open local webcam (ID 0).") else: print("[SUCCESS] Local webcam opened successfully.") else: print(f"[SUCCESS] Camera source {CAMERA_SOURCE} opened successfully.") model = YOLO('yolo11n.pt') # --- Global Variables --- ALERT_INTERVAL = 300 # 5 minutes last_alert_time = 0 SITE_LOCATION_DETAILS = { "address": "Loading location...", "latitude": "N/A", "longitude": "N/A" } latest_object_counts = {} geolocator = Nominatim(user_agent="security_monitoring_app") def make_call(): account_sid = os.getenv("TWILIO_ACCOUNT_SID") auth_token = os.getenv("TWILIO_AUTH_TOKEN") to_number = os.getenv("TWILIO_TO_NUMBER") from_number = os.getenv("TWILIO_FROM_NUMBER") if not all([account_sid, auth_token, to_number, from_number]): print("[WARNING] Twilio credentials missing in environment. Call skipped.") return client = Client(account_sid, auth_token) try: # Check if the location has been set by the browser yet if SITE_LOCATION_DETAILS.get('latitude') == 'N/A': # If not, use a generic message message = "Hello User, an intrusion has been detected at your farm. Please check your Telegram for an image of the alert. Contact team Krushimitra for support." else: # If location IS available, create the full, detailed message obj_list = ', '.join([f'{v} {k}' for k, v in latest_object_counts.items()]) # e.g., "2 person" location_address = SITE_LOCATION_DETAILS.get('address', 'an unknown location') try: lat = round(float(SITE_LOCATION_DETAILS.get('latitude', 'N/A')), 2) except Exception: lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A') try: lon = round(float(SITE_LOCATION_DETAILS.get('longitude', 'N/A')), 2) except Exception: lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A') message = ( f"नमस्ते उपयोगकर्ता, आपके खेत स्थान {location_address} पर {obj_list} का पता चला है, " f"अक्षांश: {lat}, देशांतर: {lon}। कृपया विस्तृत जानकारी के लिए अपना टेलीग्राम देखें। सहायता के लिए टीम कृषिमित्र से संपर्क करें, धन्यवाद।" ) print(f"DEBUG: Sending call with message: {message}") twiml_instructions = f'{message}' call = client.calls.create( twiml=twiml_instructions, to=to_number, from_=from_number ) print(f"Call initiated. SID: {call.sid}") print("[DEBUG] Call executed successfully.") except Exception as e: print(f"Error making call: {e}") def send_telegram_message(image, caption): TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") if not TOKEN or not CHAT_ID: print("[WARNING] Telegram credentials missing in environment. Alert skipped.") return send_photo_url = f"https://api.telegram.org/bot{TOKEN}/sendPhoto" ret, buffer = cv2.imencode('.jpg', image) if not ret: print("Failed to encode image for Telegram.") return files = {"photo": ("alert.jpg", buffer.tobytes(), "image/jpeg")} # Add full location details to caption location_address = SITE_LOCATION_DETAILS.get('address', 'Unknown') lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A') lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A') caption += f"\nLocation: {location_address}\nLatitude: {lat}\nLongitude: {lon}" data = {"chat_id": CHAT_ID, "caption": caption} try: response = requests.post(send_photo_url, data=data, files=files) if response.status_code == 200: print("Telegram alert sent successfully.") else: print(f"Failed to send Telegram alert. Status: {response.status_code}, Response: {response.text}") except Exception as e: print(f"Error sending Telegram message: {e}") # Siren control (no pygame, uses playsound) siren_thread = None siren_stop_event = threading.Event() def play_siren_continuous(): """Continuously play siren while not stopped. Always starts from beginning.""" while not siren_stop_event.is_set(): try: playsound('alarn_tune.mp3') except Exception as e: print(f"Error playing siren (likely headless environment): {e}") # If it fails once, it will likely fail again on a headless server. # We exit the thread to prevent infinite log spamming. break # If stopped during playback, break immediately if siren_stop_event.is_set(): break def start_siren(): global siren_thread, siren_stop_event if siren_thread is None or not siren_thread.is_alive(): siren_stop_event.clear() siren_thread = threading.Thread(target=play_siren_continuous, daemon=True) siren_thread.start() def stop_siren(): global siren_stop_event siren_stop_event.set() # --- Video Processing --- def gen_frames(): global last_alert_time, latest_object_counts, SITE_LOCATION_DETAILS allowed_classes = {"person","bird","cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra","giraffe"} last_telegram_time = 0 last_object_counts = Counter() siren_playing = False while True: success, frame = camera.read() if not success: print("[DEBUG] Camera read failed. Yielding fallback frame...") # Create a black frame with "Camera Disconnected" text fallback_frame = np.zeros((480, 640, 3), dtype=np.uint8) cv2.putText(fallback_frame, "CAMERA DISCONNECTED", (100, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) cv2.putText(fallback_frame, "Check Ngrok URL in Secrets", (120, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) ret, buffer = cv2.imencode('.jpg', fallback_frame) if ret: frame_bytes = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') time.sleep(5) # Wait 5 seconds before retrying # Try to re-initialize camera if it was never successful if not camera.isOpened(): camera.open(CAMERA_SOURCE) continue print("[DEBUG] Frame read successfully. Starting detection...") results = model(frame, verbose=False) # Suppress YOLO prints detected_objects = [] for box in results[0].boxes: class_id = int(box.cls[0]) class_name = model.names[class_id] if class_name in allowed_classes: detected_objects.append(class_name) x1, y1, x2, y2 = box.xyxy[0] confidence = box.conf[0] label = f"{class_name} ({confidence:.2f})" cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2) cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2) object_counts = Counter(detected_objects) latest_object_counts = dict(object_counts) # Siren logic: play if object count >=2, stop if <2 if sum(object_counts.values()) >= 0: if not siren_playing: start_siren() siren_playing = True else: if siren_playing: stop_siren() siren_playing = False # --- Alert Logic --- current_time = time.time() # Telegram: only if new object or count increased, and at least 15s since last send_telegram = False if sum(object_counts.values()) > 0: if (object_counts != last_object_counts or any(object_counts[k] > last_object_counts.get(k, 0) for k in object_counts)) and (current_time - last_telegram_time >= 15): send_telegram = True last_telegram_time = current_time last_object_counts = object_counts.copy() else: last_object_counts = Counter() if send_telegram: detected_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") obj_list = ', '.join([f'{k}: {v}' for k, v in object_counts.items()]) location_address = SITE_LOCATION_DETAILS.get('address', 'Unknown') lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A') lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A') caption = ( f"ALERT: Intrusion Detected!\n" f"Objects: {obj_list}\n" f"Time: {detected_time}\n" f"Location: {location_address}\nLatitude: {lat}\nLongitude: {lon}" ) threading.Thread(target=send_telegram_message, args=(frame.copy(), caption)).start() # Call only if enough time has passed since last call if sum(object_counts.values()) > 0 and (current_time - last_alert_time >= ALERT_INTERVAL) and (SITE_LOCATION_DETAILS.get('latitude') != 'N/A'): last_alert_time = current_time threading.Thread(target=make_call).start() # Encode the frame for streaming ret, buffer = cv2.imencode('.jpg', frame) if not ret: continue frame_bytes = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') # --- Flask Routes --- @app.route('/') def index(): """Render the main dashboard page.""" return render_template('index.html') @app.route('/video_feed') def video_feed(): """Video streaming route.""" return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/set_location', methods=['POST']) def set_location(): """Sets the site location from client-side data.""" global SITE_LOCATION_DETAILS data = request.get_json() if data and 'latitude' in data and 'longitude' in data: lat, lon = data['latitude'], data['longitude'] try: location_details = geolocator.reverse(f"{lat},{lon}", language='en') address = location_details.address if location_details else f"Lat: {lat}, Lon: {lon}" SITE_LOCATION_DETAILS = { "address": address, "latitude": str(lat), "longitude": str(lon) } return jsonify({"status": "success", "location": address, "latitude": lat, "longitude": lon}) except Exception as e: SITE_LOCATION_DETAILS = { "address": f"Lat: {lat}, Lon: {lon} (Reverse lookup failed)", "latitude": str(lat), "longitude": str(lon) } return jsonify({"status": "error", "message": str(e), "location": SITE_LOCATION_DETAILS["address"], "latitude": lat, "longitude": lon}), 500 return jsonify({"status": "error", "message": "Invalid location data"}), 400 @app.route('/detect_frame', methods=['POST']) def detect_frame(): """Receives a base64 frame from the browser, runs local YOLO, and triggers alerts.""" global last_alert_time, latest_object_counts, SITE_LOCATION_DETAILS data = request.get_json() if not data or 'image' not in data: return jsonify({"error": "No image data"}), 400 try: # Decode base64 image header, encoded = data['image'].split(",", 1) image_bytes = base64.b64decode(encoded) nparr = np.frombuffer(image_bytes, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return jsonify({"error": "Invalid image"}), 400 # Run local YOLO detection results = model(frame, verbose=False) allowed_classes = {"person","bird","cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra","giraffe"} detections = [] detected_objects = [] for box in results[0].boxes: class_id = int(box.cls[0]) class_name = model.names[class_id] if class_name in allowed_classes: conf = float(box.conf[0]) x1, y1, x2, y2 = box.xyxy[0].tolist() detections.append({ "class": class_name, "confidence": conf, "bbox": [x1, y1, x2, y2] }) detected_objects.append(class_name) object_counts = Counter(detected_objects) latest_object_counts = dict(object_counts) # Alert Logic (Same as gen_frames) current_time = time.time() if "person" in object_counts and (current_time - last_alert_time >= ALERT_INTERVAL) and (SITE_LOCATION_DETAILS.get('latitude') != 'N/A'): last_alert_time = current_time threading.Thread(target=make_call).start() # Send Telegram alert detected_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") obj_list = ', '.join([f'{k}: {v}' for k, v in object_counts.items()]) caption = f"BROWSER ALERT: Intrusion Detected!\nObjects: {obj_list}\nTime: {detected_time}" threading.Thread(target=send_telegram_message, args=(frame.copy(), caption)).start() return jsonify({ "detections": detections, "object_counts": latest_object_counts }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/object_stats', methods=['GET']) def object_stats(): """Endpoint to provide object counts and location to the frontend.""" return jsonify({ "object_counts": latest_object_counts, "location": SITE_LOCATION_DETAILS["address"], "latitude": SITE_LOCATION_DETAILS["latitude"], "longitude": SITE_LOCATION_DETAILS["longitude"] }) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, threaded=True)