| from flask import Flask, render_template, Response, jsonify, request
|
| from flask import Response as FlaskResponse
|
| import os
|
| import sys
|
| from dotenv import load_dotenv
|
|
|
|
|
| os.environ["OPENCV_LOG_LEVEL"] = "OFF"
|
| os.environ["YOLO_CONFIG_DIR"] = "/tmp"
|
|
|
| import cv2
|
| import time
|
| import numpy as np
|
| import threading
|
| import requests
|
| from flask import Flask, render_template, Response, request, jsonify
|
| from flask import Response as FlaskResponse
|
| from twilio.rest import Client
|
| from playsound import playsound
|
| from ultralytics import YOLO
|
| from collections import Counter
|
| from datetime import datetime
|
| from geopy.geocoders import Nominatim
|
| import base64
|
| import io
|
|
|
|
|
|
|
| load_dotenv()
|
|
|
|
|
| app = Flask(__name__)
|
|
|
| CAMERA_SOURCE = os.getenv("CAMERA_SOURCE", "https://improvisatory-armandina-nonsuppressed.ngrok-free.dev/video")
|
| print(f"[INFO] Server starting... Checking camera source: {CAMERA_SOURCE}")
|
| camera = cv2.VideoCapture(CAMERA_SOURCE)
|
|
|
| if not camera.isOpened():
|
| print(f"[WARNING] Could not open ngrok source: {CAMERA_SOURCE}")
|
| print("[INFO] Falling back to local webcam (ID 0)...")
|
| camera = cv2.VideoCapture(0)
|
| if not camera.isOpened():
|
| print("[ERROR] Could not open local webcam (ID 0).")
|
| else:
|
| print("[SUCCESS] Local webcam opened successfully.")
|
| else:
|
| print(f"[SUCCESS] Camera source {CAMERA_SOURCE} opened successfully.")
|
|
|
| model = YOLO('yolo11n.pt')
|
|
|
|
|
|
|
| ALERT_INTERVAL = 300
|
| last_alert_time = 0
|
| SITE_LOCATION_DETAILS = {
|
| "address": "Loading location...",
|
| "latitude": "N/A",
|
| "longitude": "N/A"
|
| }
|
| latest_object_counts = {}
|
| geolocator = Nominatim(user_agent="security_monitoring_app")
|
|
|
|
|
| def make_call():
|
| account_sid = os.getenv("TWILIO_ACCOUNT_SID")
|
| auth_token = os.getenv("TWILIO_AUTH_TOKEN")
|
| to_number = os.getenv("TWILIO_TO_NUMBER")
|
| from_number = os.getenv("TWILIO_FROM_NUMBER")
|
|
|
| if not all([account_sid, auth_token, to_number, from_number]):
|
| print("[WARNING] Twilio credentials missing in environment. Call skipped.")
|
| return
|
|
|
| client = Client(account_sid, auth_token)
|
|
|
| try:
|
|
|
| if SITE_LOCATION_DETAILS.get('latitude') == 'N/A':
|
|
|
| message = "Hello User, an intrusion has been detected at your farm. Please check your Telegram for an image of the alert. Contact team Krushimitra for support."
|
| else:
|
|
|
| obj_list = ', '.join([f'{v} {k}' for k, v in latest_object_counts.items()])
|
| location_address = SITE_LOCATION_DETAILS.get('address', 'an unknown location')
|
| try:
|
| lat = round(float(SITE_LOCATION_DETAILS.get('latitude', 'N/A')), 2)
|
| except Exception:
|
| lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A')
|
| try:
|
| lon = round(float(SITE_LOCATION_DETAILS.get('longitude', 'N/A')), 2)
|
| except Exception:
|
| lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A')
|
|
|
| message = (
|
| f"नमस्ते उपयोगकर्ता, आपके खेत स्थान {location_address} पर {obj_list} का पता चला है, "
|
| f"अक्षांश: {lat}, देशांतर: {lon}। कृपया विस्तृत जानकारी के लिए अपना टेलीग्राम देखें। सहायता के लिए टीम कृषिमित्र से संपर्क करें, धन्यवाद।"
|
| )
|
|
|
| print(f"DEBUG: Sending call with message: {message}")
|
| twiml_instructions = f'<Response><Say voice="Polly.Aditi">{message}</Say></Response>'
|
|
|
| call = client.calls.create(
|
| twiml=twiml_instructions,
|
| to=to_number,
|
| from_=from_number
|
| )
|
| print(f"Call initiated. SID: {call.sid}")
|
| print("[DEBUG] Call executed successfully.")
|
|
|
| except Exception as e:
|
| print(f"Error making call: {e}")
|
|
|
|
|
|
|
| def send_telegram_message(image, caption):
|
| TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
| CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
|
|
|
| if not TOKEN or not CHAT_ID:
|
| print("[WARNING] Telegram credentials missing in environment. Alert skipped.")
|
| return
|
|
|
| send_photo_url = f"https://api.telegram.org/bot{TOKEN}/sendPhoto"
|
| ret, buffer = cv2.imencode('.jpg', image)
|
| if not ret:
|
| print("Failed to encode image for Telegram.")
|
| return
|
| files = {"photo": ("alert.jpg", buffer.tobytes(), "image/jpeg")}
|
|
|
| location_address = SITE_LOCATION_DETAILS.get('address', 'Unknown')
|
| lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A')
|
| lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A')
|
| caption += f"\nLocation: {location_address}\nLatitude: {lat}\nLongitude: {lon}"
|
| data = {"chat_id": CHAT_ID, "caption": caption}
|
| try:
|
| response = requests.post(send_photo_url, data=data, files=files)
|
| if response.status_code == 200:
|
| print("Telegram alert sent successfully.")
|
| else:
|
| print(f"Failed to send Telegram alert. Status: {response.status_code}, Response: {response.text}")
|
| except Exception as e:
|
| print(f"Error sending Telegram message: {e}")
|
|
|
|
|
| siren_thread = None
|
| siren_stop_event = threading.Event()
|
|
|
| def play_siren_continuous():
|
| """Continuously play siren while not stopped. Always starts from beginning."""
|
| while not siren_stop_event.is_set():
|
| try:
|
| playsound('alarn_tune.mp3')
|
| except Exception as e:
|
| print(f"Error playing siren (likely headless environment): {e}")
|
|
|
|
|
| break
|
|
|
| if siren_stop_event.is_set():
|
| break
|
|
|
| def start_siren():
|
| global siren_thread, siren_stop_event
|
| if siren_thread is None or not siren_thread.is_alive():
|
| siren_stop_event.clear()
|
| siren_thread = threading.Thread(target=play_siren_continuous, daemon=True)
|
| siren_thread.start()
|
|
|
| def stop_siren():
|
| global siren_stop_event
|
| siren_stop_event.set()
|
|
|
|
|
|
|
|
|
|
|
| def gen_frames():
|
| global last_alert_time, latest_object_counts, SITE_LOCATION_DETAILS
|
| allowed_classes = {"person","bird","cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra","giraffe"}
|
| last_telegram_time = 0
|
| last_object_counts = Counter()
|
| siren_playing = False
|
| while True:
|
| success, frame = camera.read()
|
| if not success:
|
| print("[DEBUG] Camera read failed. Yielding fallback frame...")
|
|
|
| fallback_frame = np.zeros((480, 640, 3), dtype=np.uint8)
|
| cv2.putText(fallback_frame, "CAMERA DISCONNECTED", (100, 240),
|
| cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
|
| cv2.putText(fallback_frame, "Check Ngrok URL in Secrets", (120, 280),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
|
|
|
| ret, buffer = cv2.imencode('.jpg', fallback_frame)
|
| if ret:
|
| frame_bytes = buffer.tobytes()
|
| yield (b'--frame\r\n'
|
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
|
|
|
| time.sleep(5)
|
|
|
| if not camera.isOpened():
|
| camera.open(CAMERA_SOURCE)
|
| continue
|
|
|
| print("[DEBUG] Frame read successfully. Starting detection...")
|
|
|
| results = model(frame, verbose=False)
|
| detected_objects = []
|
| for box in results[0].boxes:
|
| class_id = int(box.cls[0])
|
| class_name = model.names[class_id]
|
| if class_name in allowed_classes:
|
| detected_objects.append(class_name)
|
| x1, y1, x2, y2 = box.xyxy[0]
|
| confidence = box.conf[0]
|
| label = f"{class_name} ({confidence:.2f})"
|
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
|
| cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
|
|
|
| object_counts = Counter(detected_objects)
|
| latest_object_counts = dict(object_counts)
|
|
|
|
|
| if sum(object_counts.values()) >= 0:
|
| if not siren_playing:
|
| start_siren()
|
| siren_playing = True
|
| else:
|
| if siren_playing:
|
| stop_siren()
|
| siren_playing = False
|
|
|
|
|
| current_time = time.time()
|
|
|
| send_telegram = False
|
| if sum(object_counts.values()) > 0:
|
| if (object_counts != last_object_counts or any(object_counts[k] > last_object_counts.get(k, 0) for k in object_counts)) and (current_time - last_telegram_time >= 15):
|
| send_telegram = True
|
| last_telegram_time = current_time
|
| last_object_counts = object_counts.copy()
|
| else:
|
| last_object_counts = Counter()
|
|
|
| if send_telegram:
|
| detected_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| obj_list = ', '.join([f'{k}: {v}' for k, v in object_counts.items()])
|
| location_address = SITE_LOCATION_DETAILS.get('address', 'Unknown')
|
| lat = SITE_LOCATION_DETAILS.get('latitude', 'N/A')
|
| lon = SITE_LOCATION_DETAILS.get('longitude', 'N/A')
|
| caption = (
|
| f"ALERT: Intrusion Detected!\n"
|
| f"Objects: {obj_list}\n"
|
| f"Time: {detected_time}\n"
|
| f"Location: {location_address}\nLatitude: {lat}\nLongitude: {lon}"
|
| )
|
| threading.Thread(target=send_telegram_message, args=(frame.copy(), caption)).start()
|
|
|
|
|
| if sum(object_counts.values()) > 0 and (current_time - last_alert_time >= ALERT_INTERVAL) and (SITE_LOCATION_DETAILS.get('latitude') != 'N/A'):
|
| last_alert_time = current_time
|
| threading.Thread(target=make_call).start()
|
|
|
|
|
| ret, buffer = cv2.imencode('.jpg', frame)
|
| if not ret:
|
| continue
|
| frame_bytes = buffer.tobytes()
|
| yield (b'--frame\r\n'
|
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
|
|
|
|
|
| @app.route('/')
|
| def index():
|
| """Render the main dashboard page."""
|
| return render_template('index.html')
|
|
|
| @app.route('/video_feed')
|
| def video_feed():
|
| """Video streaming route."""
|
| return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
| @app.route('/set_location', methods=['POST'])
|
| def set_location():
|
| """Sets the site location from client-side data."""
|
| global SITE_LOCATION_DETAILS
|
| data = request.get_json()
|
| if data and 'latitude' in data and 'longitude' in data:
|
| lat, lon = data['latitude'], data['longitude']
|
| try:
|
| location_details = geolocator.reverse(f"{lat},{lon}", language='en')
|
| address = location_details.address if location_details else f"Lat: {lat}, Lon: {lon}"
|
| SITE_LOCATION_DETAILS = {
|
| "address": address,
|
| "latitude": str(lat),
|
| "longitude": str(lon)
|
| }
|
| return jsonify({"status": "success", "location": address, "latitude": lat, "longitude": lon})
|
| except Exception as e:
|
| SITE_LOCATION_DETAILS = {
|
| "address": f"Lat: {lat}, Lon: {lon} (Reverse lookup failed)",
|
| "latitude": str(lat),
|
| "longitude": str(lon)
|
| }
|
| return jsonify({"status": "error", "message": str(e), "location": SITE_LOCATION_DETAILS["address"], "latitude": lat, "longitude": lon}), 500
|
| return jsonify({"status": "error", "message": "Invalid location data"}), 400
|
|
|
| @app.route('/detect_frame', methods=['POST'])
|
| def detect_frame():
|
| """Receives a base64 frame from the browser, runs local YOLO, and triggers alerts."""
|
| global last_alert_time, latest_object_counts, SITE_LOCATION_DETAILS
|
|
|
| data = request.get_json()
|
| if not data or 'image' not in data:
|
| return jsonify({"error": "No image data"}), 400
|
|
|
| try:
|
|
|
| header, encoded = data['image'].split(",", 1)
|
| image_bytes = base64.b64decode(encoded)
|
| nparr = np.frombuffer(image_bytes, np.uint8)
|
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
| if frame is None:
|
| return jsonify({"error": "Invalid image"}), 400
|
|
|
|
|
| results = model(frame, verbose=False)
|
| allowed_classes = {"person","bird","cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra","giraffe"}
|
|
|
| detections = []
|
| detected_objects = []
|
| for box in results[0].boxes:
|
| class_id = int(box.cls[0])
|
| class_name = model.names[class_id]
|
| if class_name in allowed_classes:
|
| conf = float(box.conf[0])
|
| x1, y1, x2, y2 = box.xyxy[0].tolist()
|
| detections.append({
|
| "class": class_name,
|
| "confidence": conf,
|
| "bbox": [x1, y1, x2, y2]
|
| })
|
| detected_objects.append(class_name)
|
|
|
| object_counts = Counter(detected_objects)
|
| latest_object_counts = dict(object_counts)
|
|
|
|
|
| current_time = time.time()
|
| if "person" in object_counts and (current_time - last_alert_time >= ALERT_INTERVAL) and (SITE_LOCATION_DETAILS.get('latitude') != 'N/A'):
|
| last_alert_time = current_time
|
| threading.Thread(target=make_call).start()
|
|
|
|
|
| detected_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| obj_list = ', '.join([f'{k}: {v}' for k, v in object_counts.items()])
|
| caption = f"BROWSER ALERT: Intrusion Detected!\nObjects: {obj_list}\nTime: {detected_time}"
|
| threading.Thread(target=send_telegram_message, args=(frame.copy(), caption)).start()
|
|
|
| return jsonify({
|
| "detections": detections,
|
| "object_counts": latest_object_counts
|
| })
|
|
|
| except Exception as e:
|
| return jsonify({"error": str(e)}), 500
|
|
|
| @app.route('/object_stats', methods=['GET'])
|
| def object_stats():
|
| """Endpoint to provide object counts and location to the frontend."""
|
| return jsonify({
|
| "object_counts": latest_object_counts,
|
| "location": SITE_LOCATION_DETAILS["address"],
|
| "latitude": SITE_LOCATION_DETAILS["latitude"],
|
| "longitude": SITE_LOCATION_DETAILS["longitude"]
|
| })
|
|
|
| if __name__ == '__main__':
|
| app.run(host='0.0.0.0', port=5000, threaded=True) |