Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, Response, jsonify | |
| import cv2 | |
| import time | |
| import numpy as np | |
| import threading | |
| import requests | |
| import os | |
| import atexit | |
| from twilio.rest import Client | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| app = Flask(__name__) | |
| # For deployment in Hugging Face Spaces, we'll use environment variables with fallbacks | |
| TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID", "AC3988de38b87b0de231ee7704d9e6dafb") | |
| TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "2a282eeb0a72c2a2bec9a1331d3cc803") | |
| TWILIO_FROM_NUMBER = os.getenv("TWILIO_FROM_NUMBER", "+19046820459") | |
| TWILIO_TO_NUMBER = os.getenv("TWILIO_TO_NUMBER", "+918999094929") | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "7289300782:AAF0qzc38BQ1S5a4kyXj7F02kUjIswb1YDY") | |
| TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "6186075118") | |
| ROBOFLOW_API_KEY = os.getenv("ROBOFLOW_API_KEY", "IkQtIl5NGRTc0llwyIMo") | |
| SITE_LOCATION = os.getenv("SITE_LOCATION", "1234 Main St, City, Country") | |
| # Initialize webcam or use a placeholder for Hugging Face | |
| # In HF Spaces, we'll use a dummy camera for demo purposes | |
| try: | |
| camera = cv2.VideoCapture(0) | |
| if not camera.isOpened(): | |
| raise Exception("Could not open camera") | |
| except Exception as e: | |
| print(f"Camera error: {e}. Using demo mode.") | |
| USE_DEMO_MODE = True | |
| # Create a black frame as placeholder | |
| demo_frame = np.zeros((480, 640, 3), dtype=np.uint8) | |
| # Add text to the frame | |
| cv2.putText( | |
| demo_frame, | |
| "Demo Mode - No Camera Access", | |
| (50, 240), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1, | |
| (255, 255, 255), | |
| 2 | |
| ) | |
| else: | |
| USE_DEMO_MODE = False | |
| # Initialize the Roboflow Inference Client | |
| try: | |
| from inference_sdk import InferenceHTTPClient | |
| CLIENT = InferenceHTTPClient( | |
| api_url="https://detect.roboflow.com", | |
| api_key=ROBOFLOW_API_KEY | |
| ) | |
| except ImportError: | |
| print("Inference SDK not available. Using placeholder detection.") | |
| CLIENT = None | |
| # Detection settings | |
| DETECTION_INTERVAL = 3 # seconds | |
| ALERT_INTERVAL = 300 # seconds | |
| last_alert_time = 0 | |
| # Cooldown for updating detection counts (in seconds) | |
| DETECTION_COOLDOWN = 10 | |
| last_count_time = 0 | |
| # Define the classes for this project | |
| PROJECT_CLASSES = [ | |
| "Balls", "Bird", "Cat", "Dog", "Elephant", "Pig", "Tikus", | |
| "apple", "bean", "bunny", "cattle", "cute", "leopard", "lion", | |
| "rat", "standpig", "tiger", "Person" | |
| ] | |
| # Store detection statistics | |
| detection_counts = {cls: 0 for cls in PROJECT_CLASSES} | |
| # Alert history | |
| alert_history = [] | |
| def cleanup(): | |
| """Release the camera when the application exits.""" | |
| global camera | |
| if not USE_DEMO_MODE and camera is not None and camera.isOpened(): | |
| camera.release() | |
| print("Camera released.") | |
| # Register cleanup function to run on exit | |
| atexit.register(cleanup) | |
| def make_call(): | |
| """Initiate a call using Twilio.""" | |
| try: | |
| client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) | |
| call = client.calls.create( | |
| url="http://demo.twilio.com/docs/voice.xml", | |
| to=TWILIO_TO_NUMBER, | |
| from_=TWILIO_FROM_NUMBER | |
| ) | |
| print("Call initiated. Call SID:", call.sid) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to make call: {e}") | |
| return False | |
| def send_telegram_message(image, caption): | |
| """Send an alert image with caption via Telegram.""" | |
| try: | |
| send_photo_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendPhoto" | |
| ret, buffer = cv2.imencode('.jpg', image) | |
| if not ret: | |
| print("Failed to encode image.") | |
| return False | |
| files = {"photo": ("alert.jpg", buffer.tobytes(), "image/jpeg")} | |
| data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption} | |
| response = requests.post(send_photo_url, data=data, files=files) | |
| if response.status_code == 200: | |
| print("Telegram alert sent.") | |
| return True | |
| else: | |
| print(f"Failed to send Telegram alert. Status code: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| print(f"Error sending Telegram message: {e}") | |
| return False | |
| def play_siren(): | |
| """Play a siren sound alert - this won't work in HF Spaces.""" | |
| print("Alert sound would play here (disabled in HF Spaces)") | |
| def process_frame(frame): | |
| """Process a frame for object detection.""" | |
| global detection_counts, last_count_time | |
| if CLIENT is None: | |
| # Generate demo predictions if Roboflow isn't available | |
| predictions = [ | |
| { | |
| 'class': 'Person', | |
| 'confidence': 0.92, | |
| 'x': frame.shape[1] // 2, | |
| 'y': frame.shape[0] // 2, | |
| 'width': 100, | |
| 'height': 200 | |
| } | |
| ] | |
| detected_objects = {'Person': 1} | |
| return predictions, detected_objects | |
| # Save the frame temporarily for inference | |
| image_path = "/tmp/temp_frame.jpg" | |
| cv2.imwrite(image_path, frame) | |
| try: | |
| # Perform object detection using Roboflow | |
| result = CLIENT.infer(image_path, model_id="yolov8n-640") | |
| predictions = result.get('predictions', []) | |
| except Exception as e: | |
| print(f"Error during inference: {e}") | |
| predictions = [] | |
| detected_objects = {} | |
| current_frame_time = time.time() | |
| # Only update detection counts if the cooldown period has passed | |
| if current_frame_time - last_count_time >= DETECTION_COOLDOWN: | |
| for obj in predictions: | |
| class_name = obj['class'] | |
| # Perform case-insensitive matching | |
| for project_class in PROJECT_CLASSES: | |
| if class_name.lower() == project_class.lower(): | |
| detection_counts[project_class] = detection_counts.get(project_class, 0) + 1 | |
| detected_objects[project_class] = detected_objects.get(project_class, 0) + 1 | |
| break | |
| last_count_time = current_frame_time | |
| # Clean up temporary file | |
| try: | |
| if os.path.exists(image_path): | |
| os.remove(image_path) | |
| except Exception as e: | |
| print(f"Failed to remove temporary file: {e}") | |
| return predictions, detected_objects | |
| def gen_frames(): | |
| """Video streaming with object detection.""" | |
| global last_alert_time, alert_history | |
| while True: | |
| if USE_DEMO_MODE: | |
| # In demo mode, generate a dynamic demo frame | |
| frame = demo_frame.copy() | |
| # Add a moving element to show it's active | |
| t = time.time() | |
| x = int(320 + 200 * np.sin(t)) | |
| y = int(240 + 100 * np.cos(t)) | |
| cv2.circle(frame, (x, y), 20, (0, 165, 255), -1) | |
| # Generate some random detections for demo | |
| if time.time() % 10 < 5: # Every 5 seconds | |
| predictions = [ | |
| { | |
| 'class': 'Person', | |
| 'confidence': 0.92, | |
| 'x': x, | |
| 'y': y, | |
| 'width': 100, | |
| 'height': 200 | |
| } | |
| ] | |
| else: | |
| predictions = [] | |
| detected_objects = {'Person': 1} if predictions else {} | |
| else: | |
| # Normal camera mode | |
| success, frame = camera.read() | |
| if not success: | |
| print("Failed to capture frame from camera") | |
| time.sleep(0.1) | |
| continue | |
| # Process frame for object detection | |
| predictions, detected_objects = process_frame(frame) | |
| # Draw detections on the frame | |
| for obj in predictions: | |
| x, y, w, h = int(obj['x']), int(obj['y']), int(obj['width']), int(obj['height']) | |
| class_name = obj['class'] | |
| confidence = obj['confidence'] | |
| # Use different colors based on the class (case-insensitive check) | |
| color = (0, 255, 0) # Default green | |
| if class_name.lower() == "person": | |
| color = (0, 0, 255) # Red for persons | |
| # Draw rectangle around the object | |
| cv2.rectangle(frame, (x - w // 2, y - h // 2), (x + w // 2, y + h // 2), color, 2) | |
| # Add a label with class name and confidence | |
| label = f"{class_name}: {confidence:.2f}" | |
| (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2) | |
| cv2.rectangle(frame, (x - w // 2, y - h // 2 - text_height - 5), | |
| (x - w // 2 + text_width, y - h // 2), color, -1) | |
| cv2.putText(frame, label, (x - w // 2, y - h // 2 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| # Alert if any object from the project classes is detected and the alert interval has passed | |
| current_time = time.time() | |
| if detected_objects and (current_time - last_alert_time >= ALERT_INTERVAL): | |
| # Get the current date and time | |
| detected_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Create a caption listing the detected classes with timestamp and location | |
| caption = ( | |
| f"Alert! Detected: {', '.join(detected_objects.keys())}\n" | |
| f"Time: {detected_time}\n" | |
| f"Location: {SITE_LOCATION}" | |
| ) | |
| # Add to alert history | |
| alert_info = { | |
| "time": detected_time, | |
| "objects": list(detected_objects.keys()), | |
| "counts": detected_objects | |
| } | |
| alert_history.append(alert_info) | |
| # Keep only the last 10 alerts | |
| if len(alert_history) > 10: | |
| alert_history.pop(0) | |
| # In a real environment, we would start alert threads | |
| # In HF Spaces, we'll just log the alerts | |
| print(f"Alert triggered: {caption}") | |
| last_alert_time = current_time | |
| # Add timestamp to frame | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| cv2.putText(frame, timestamp, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # Encode the frame for streaming | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| if not ret: | |
| continue | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') | |
| # Add a small delay to control frame rate | |
| time.sleep(0.05) | |
| def index(): | |
| return render_template('index.html') | |
| def video_feed(): | |
| return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| def detection_data(): | |
| """Return the current detection counts as JSON.""" | |
| filtered_counts = {k: v for k, v in detection_counts.items() if v > 0} | |
| return jsonify(filtered_counts) | |
| def get_alert_history(): | |
| """Return the history of alerts as JSON.""" | |
| return jsonify(alert_history) | |
| def reset_counts(): | |
| """Reset all detection counts.""" | |
| global detection_counts | |
| detection_counts = {cls: 0 for cls in PROJECT_CLASSES} | |
| return jsonify({"status": "success", "message": "Detection counts reset"}) | |
| # Add a dummy route for Hugging Face Spaces healthcheck | |
| def healthcheck(): | |
| return jsonify({"status": "healthy"}) | |
| if __name__ == '__main__': | |
| # Get port from environment (needed for Hugging Face Spaces) | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port) |