Spaces:
Running
Running
| import os | |
| import cv2 | |
| import json | |
| from flask import Flask, request, jsonify, render_template, send_from_directory | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| from ultralytics import YOLO | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configuration | |
| UPLOAD_FOLDER = 'uploads' | |
| RESULT_FOLDER = 'static/results' | |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'avi', 'mov'} | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['RESULT_FOLDER'] = RESULT_FOLDER | |
| # Create directories if they don't exist | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(RESULT_FOLDER, exist_ok=True) | |
| # Load YOLOv8 model (pre-trained on COCO) | |
| # We use the nano version for speed in this environment | |
| model = YOLO('yolov8n.pt') | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def index(): | |
| return render_template('index.html') | |
| def detect_objects(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| # Determine file type | |
| ext = filename.rsplit('.', 1)[1].lower() | |
| if ext in {'mp4', 'avi', 'mov'}: | |
| return process_video(filepath, filename) | |
| else: | |
| return process_image(filepath, filename) | |
| return jsonify({'error': 'File type not allowed'}), 400 | |
| def process_image(filepath, filename): | |
| # Run inference | |
| results = model(filepath) | |
| # Get the first result (since we only processed one image) | |
| result = results[0] | |
| # Save the plotted image (with bounding boxes) | |
| res_filename = 'res_' + filename | |
| res_path = os.path.join(app.config['RESULT_FOLDER'], res_filename) | |
| result.save(filename=res_path) | |
| # Count objects | |
| detections = [] | |
| counts = {} | |
| for box in result.boxes: | |
| cls_id = int(box.cls[0]) | |
| label = model.names[cls_id] | |
| conf = float(box.conf[0]) | |
| detections.append({ | |
| 'label': label, | |
| 'confidence': conf, | |
| 'box': box.xyxy[0].tolist() | |
| }) | |
| counts[label] = counts.get(label, 0) + 1 | |
| return jsonify({ | |
| 'success': True, | |
| 'type': 'image', | |
| 'result_url': f'/static/results/{res_filename}', | |
| 'objects': detections, | |
| 'counts': counts, | |
| 'total_count': len(detections) | |
| }) | |
| def process_video(filepath, filename): | |
| # For video, we'll run inference and save the output video | |
| # Note: Processing a full video can be slow. | |
| # We'll use ultralytics' built-in video saving capability for simplicity | |
| res_filename = 'res_' + filename.split('.')[0] + '.mp4' | |
| res_path = os.path.join(app.config['RESULT_FOLDER'], res_filename) | |
| # Run inference on the video | |
| # Use project/name to control output location | |
| results = model(filepath, stream=True) | |
| # We need to collect overall counts for the video | |
| all_seen_objects = {} | |
| # To keep it simple and faster for the demo, we process every 5th frame if it's long? | |
| # No, let's just use the built-in save for now. | |
| # Run model.predict with save=True | |
| save_results = model.predict(filepath, save=True, project=app.config['RESULT_FOLDER'], name='vid_temp', exist_ok=True) | |
| # Find the saved video. Ultralytics saves it in a subfolder. | |
| # We want to move it to our RESULT_FOLDER with a predictable name. | |
| # Typically it goes to RESULT_FOLDER/vid_temp/filename | |
| raw_saved_path = os.path.join(app.config['RESULT_FOLDER'], 'vid_temp', filename) | |
| if os.path.exists(raw_saved_path): | |
| import shutil | |
| shutil.move(raw_saved_path, res_path) | |
| # Cleanup temp dir | |
| shutil.rmtree(os.path.join(app.config['RESULT_FOLDER'], 'vid_temp')) | |
| else: | |
| # Fallback if names differ (sometimes .avi becomes .mp4 etc) | |
| # Just check the folder | |
| temp_dir = os.path.join(app.config['RESULT_FOLDER'], 'vid_temp') | |
| if os.path.exists(temp_dir): | |
| files = os.listdir(temp_dir) | |
| if files: | |
| shutil.move(os.path.join(temp_dir, files[0]), res_path) | |
| shutil.rmtree(temp_dir) | |
| # For video summary, we'll just run a quick pass to get unique objects | |
| # (In a real app, you'd aggregate frame-by-frame) | |
| # Just return success for now with the URL | |
| return jsonify({ | |
| 'success': True, | |
| 'type': 'video', | |
| 'result_url': f'/static/results/{res_filename}', | |
| 'message': 'Video processed successfully' | |
| }) | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=5000) | |