- app.py +0 -124
- backend.py +66 -0
- gradio_ui.py +56 -0
app.py
DELETED
|
@@ -1,124 +0,0 @@
|
|
| 1 |
-
import gradio as gr
|
| 2 |
-
|
| 3 |
-
|
| 4 |
-
|
| 5 |
-
import os
|
| 6 |
-
import json
|
| 7 |
-
from ultralytics import YOLO
|
| 8 |
-
import supervision as sv
|
| 9 |
-
|
| 10 |
-
|
| 11 |
-
# --- 1. CONFIGURATION ---
|
| 12 |
-
|
| 13 |
-
# Folder where the script is
|
| 14 |
-
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
| 15 |
-
|
| 16 |
-
# Go up one level to Om_Singh
|
| 17 |
-
PROJECT_DIR = os.path.dirname(BASE_DIR) # parent folder of app/
|
| 18 |
-
|
| 19 |
-
# Paths for outputs in video_and_json folder
|
| 20 |
-
VIDEO_DIR = os.path.join(PROJECT_DIR, "video_and_json") # Om_Singh/video_and_json
|
| 21 |
-
os.makedirs(VIDEO_DIR, exist_ok=True) # ensure folder exists
|
| 22 |
-
|
| 23 |
-
MODEL_PATH = os.path.join(BASE_DIR, "best.pt") # model stays in app/
|
| 24 |
-
OUTPUT_VIDEO_PATH = os.path.join(VIDEO_DIR, "output.mp4")
|
| 25 |
-
OUTPUT_JSON_PATH = os.path.join(VIDEO_DIR, "result.json")
|
| 26 |
-
|
| 27 |
-
def process_video(INPUT_VIDEO_PATH, OUTPUT_VIDEO_PATH, OUTPUT_JSON_PATH):
|
| 28 |
-
print("Loading model...")
|
| 29 |
-
model = YOLO(MODEL_PATH)
|
| 30 |
-
|
| 31 |
-
print("Initializing tracker and annotators...")
|
| 32 |
-
tracker = sv.ByteTrack()
|
| 33 |
-
box_annotator = sv.BoxAnnotator(thickness=5)
|
| 34 |
-
label_annotator = sv.LabelAnnotator(text_position=sv.Position.TOP_CENTER, text_scale = 1, text_thickness= 1)
|
| 35 |
-
|
| 36 |
-
sv.LabelAnnotator()
|
| 37 |
-
|
| 38 |
-
frame_generator = sv.get_video_frames_generator(source_path=INPUT_VIDEO_PATH)
|
| 39 |
-
video_info = sv.VideoInfo.from_video_path(INPUT_VIDEO_PATH)
|
| 40 |
-
|
| 41 |
-
results_list = []
|
| 42 |
-
|
| 43 |
-
with sv.VideoSink(target_path=OUTPUT_VIDEO_PATH, video_info=video_info) as sink:
|
| 44 |
-
print("Processing video frames...")
|
| 45 |
-
for frame_number, frame in enumerate(frame_generator):
|
| 46 |
-
# Run YOLO prediction
|
| 47 |
-
results = model(frame)[0]
|
| 48 |
-
detections = sv.Detections.from_ultralytics(results)
|
| 49 |
-
|
| 50 |
-
# Update tracker
|
| 51 |
-
tracked_detections = tracker.update_with_detections(detections=detections)
|
| 52 |
-
|
| 53 |
-
# Prepare labels
|
| 54 |
-
labels = [
|
| 55 |
-
f"ID: {det[4]} {model.model.names[int(det[3])]}"
|
| 56 |
-
for det in tracked_detections
|
| 57 |
-
]
|
| 58 |
-
|
| 59 |
-
# Annotate frame
|
| 60 |
-
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=tracked_detections)
|
| 61 |
-
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=tracked_detections, labels=labels)
|
| 62 |
-
|
| 63 |
-
# Save tracking info
|
| 64 |
-
for det in tracked_detections:
|
| 65 |
-
bbox = det[0]
|
| 66 |
-
conf = det[2]
|
| 67 |
-
class_id = int(det[3])
|
| 68 |
-
tracker_id = det[4]
|
| 69 |
-
|
| 70 |
-
results_list.append({
|
| 71 |
-
"frame_number": frame_number,
|
| 72 |
-
"track_id": int(tracker_id),
|
| 73 |
-
"class": model.model.names[class_id],
|
| 74 |
-
"confidence": float(conf),
|
| 75 |
-
"bounding_box": [int(coord) for coord in bbox]
|
| 76 |
-
})
|
| 77 |
-
|
| 78 |
-
# Write annotated frame
|
| 79 |
-
sink.write_frame(frame=annotated_frame)
|
| 80 |
-
|
| 81 |
-
if frame_number % 30 == 0:
|
| 82 |
-
print(f"Processed frame {frame_number}...")
|
| 83 |
-
|
| 84 |
-
print("Video processing complete. Saving results...")
|
| 85 |
-
with open(OUTPUT_JSON_PATH, 'w') as f:
|
| 86 |
-
json.dump(results_list, f, indent=4)
|
| 87 |
-
|
| 88 |
-
print("--- All tasks finished successfully! ---")
|
| 89 |
-
|
| 90 |
-
|
| 91 |
-
|
| 92 |
-
# --- Main processing function ---
|
| 93 |
-
def process(input_video):
|
| 94 |
-
output_video = "output.mp4"
|
| 95 |
-
output_json = "result.json"
|
| 96 |
-
|
| 97 |
-
# During processing: red text
|
| 98 |
-
status_html = "<p style='color:red; font-weight:bold;'>Processing...</p>"
|
| 99 |
-
|
| 100 |
-
# Run video processing
|
| 101 |
-
process_video(input_video, output_video, output_json)
|
| 102 |
-
|
| 103 |
-
# After processing: green text
|
| 104 |
-
status_html = "<p style='color:limegreen; font-weight:bold;'>Processing complete!</p>"
|
| 105 |
-
return status_html, output_video, output_json
|
| 106 |
-
|
| 107 |
-
# --- Gradio UI ---
|
| 108 |
-
with gr.Blocks() as demo:
|
| 109 |
-
gr.Markdown("<h1 style='text-align:center;'>Vehicle and Pedestrian Tracker</h1>")
|
| 110 |
-
|
| 111 |
-
input_video = gr.Video(label="Upload Video")
|
| 112 |
-
start_btn = gr.Button("Start Tracking")
|
| 113 |
-
status_display = gr.HTML("") # Initially empty
|
| 114 |
-
output_video = gr.Video(label="Processed Video")
|
| 115 |
-
output_json = gr.File(label="Download JSON Output")
|
| 116 |
-
|
| 117 |
-
start_btn.click(
|
| 118 |
-
fn=process,
|
| 119 |
-
inputs=input_video,
|
| 120 |
-
outputs=[status_display, output_video, output_json]
|
| 121 |
-
)
|
| 122 |
-
|
| 123 |
-
if __name__ == "__main__":
|
| 124 |
-
demo.launch(share = True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backend.py
ADDED
|
@@ -0,0 +1,66 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import os
|
| 2 |
+
import json
|
| 3 |
+
import tempfile
|
| 4 |
+
from fastapi import FastAPI, UploadFile, File
|
| 5 |
+
from fastapi.responses import FileResponse
|
| 6 |
+
from ultralytics import YOLO
|
| 7 |
+
import supervision as sv
|
| 8 |
+
|
| 9 |
+
# --- MODEL AND APP INITIALIZATION ---
|
| 10 |
+
# The model is loaded only ONCE when the server starts, making it fast.
|
| 11 |
+
MODEL_PATH = "best.pt"
|
| 12 |
+
model = YOLO(MODEL_PATH)
|
| 13 |
+
|
| 14 |
+
app = FastAPI(title="YOLOv8 Tracking API")
|
| 15 |
+
|
| 16 |
+
# --- YOUR EXISTING PROCESSING LOGIC (UNCHANGED) ---
|
| 17 |
+
# We just put your code inside this function.
|
| 18 |
+
def process_video_logic(input_path, output_path, json_path):
|
| 19 |
+
tracker = sv.ByteTrack()
|
| 20 |
+
box_annotator = sv.BoxAnnotator(thickness=5)
|
| 21 |
+
label_annotator = sv.LabelAnnotator(text_position=sv.Position.TOP_CENTER, text_scale=1, text_thickness=1)
|
| 22 |
+
frame_generator = sv.get_video_frames_generator(source_path=input_path)
|
| 23 |
+
video_info = sv.VideoInfo.from_video_path(input_path)
|
| 24 |
+
results_list = []
|
| 25 |
+
|
| 26 |
+
with sv.VideoSink(target_path=output_path, video_info=video_info) as sink:
|
| 27 |
+
for frame_number, frame in enumerate(frame_generator):
|
| 28 |
+
results = model(frame, verbose=False)[0]
|
| 29 |
+
detections = sv.Detections.from_ultralytics(results)
|
| 30 |
+
tracked_detections = tracker.update_with_detections(detections=detections)
|
| 31 |
+
labels = [f"ID: {det[4]} {model.model.names[int(det[3])]}" for det in tracked_detections]
|
| 32 |
+
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=tracked_detections)
|
| 33 |
+
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=tracked_detections, labels=labels)
|
| 34 |
+
|
| 35 |
+
for det in tracked_detections:
|
| 36 |
+
bbox, conf, class_id, tracker_id = det[0], det[2], int(det[3]), det[4]
|
| 37 |
+
results_list.append({
|
| 38 |
+
"frame_number": frame_number,
|
| 39 |
+
"track_id": int(tracker_id),
|
| 40 |
+
"class": model.model.names[class_id],
|
| 41 |
+
"confidence": float(conf),
|
| 42 |
+
"bounding_box": [int(coord) for coord in bbox]
|
| 43 |
+
})
|
| 44 |
+
sink.write_frame(frame=annotated_frame)
|
| 45 |
+
|
| 46 |
+
with open(json_path, 'w') as f:
|
| 47 |
+
json.dump(results_list, f, indent=4)
|
| 48 |
+
|
| 49 |
+
# --- API ENDPOINT ---
|
| 50 |
+
@app.post("/track/")
|
| 51 |
+
async def track_video_endpoint(video: UploadFile = File(...)):
|
| 52 |
+
# Use a temporary directory to handle file operations safely
|
| 53 |
+
with tempfile.TemporaryDirectory() as temp_dir:
|
| 54 |
+
input_path = os.path.join(temp_dir, video.filename)
|
| 55 |
+
output_video_path = os.path.join(temp_dir, "output.mp4")
|
| 56 |
+
output_json_path = os.path.join(temp_dir, "results.json")
|
| 57 |
+
|
| 58 |
+
# Save the uploaded video file
|
| 59 |
+
with open(input_path, "wb") as buffer:
|
| 60 |
+
buffer.write(await video.read())
|
| 61 |
+
|
| 62 |
+
# Run your existing processing logic
|
| 63 |
+
process_video_logic(input_path, output_video_path, output_json_path)
|
| 64 |
+
|
| 65 |
+
# Return the processed video as a downloadable file
|
| 66 |
+
return FileResponse(output_video_path, media_type="video/mp4", filename="output.mp4")
|
gradio_ui.py
ADDED
|
@@ -0,0 +1,56 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import gradio as gr
|
| 2 |
+
import requests
|
| 3 |
+
import os
|
| 4 |
+
|
| 5 |
+
# --- IMPORTANT: UPDATE THIS URL ---
|
| 6 |
+
# After deploying your backend_app.py, paste its public URL here.
|
| 7 |
+
BACKEND_URL = "https://your-backend-space-name.hf.space/track/"
|
| 8 |
+
|
| 9 |
+
def track_video_via_api(input_video_path):
|
| 10 |
+
if input_video_path is None:
|
| 11 |
+
return None, "Please upload a video first."
|
| 12 |
+
|
| 13 |
+
status = "Sending video to the backend for processing..."
|
| 14 |
+
yield status, None, None # Update status immediately
|
| 15 |
+
|
| 16 |
+
try:
|
| 17 |
+
with open(input_video_path, "rb") as video_file:
|
| 18 |
+
files = {'video': (os.path.basename(input_video_path), video_file, 'video/mp4')}
|
| 19 |
+
|
| 20 |
+
# Send the video to the backend API and stream the response
|
| 21 |
+
with requests.post(BACKEND_URL, files=files, stream=True, timeout=300) as response:
|
| 22 |
+
response.raise_for_status()
|
| 23 |
+
|
| 24 |
+
# Save the processed video that the backend sends back
|
| 25 |
+
processed_video_path = "processed_video.mp4"
|
| 26 |
+
with open(processed_video_path, "wb") as f:
|
| 27 |
+
for chunk in response.iter_content(chunk_size=8192):
|
| 28 |
+
f.write(chunk)
|
| 29 |
+
|
| 30 |
+
# For this Gradio app, we will just show the video.
|
| 31 |
+
# You could modify the backend to return JSON as well.
|
| 32 |
+
status = "Processing complete!"
|
| 33 |
+
yield status, processed_video_path, None
|
| 34 |
+
|
| 35 |
+
except requests.exceptions.RequestException as e:
|
| 36 |
+
status = f"API Error: {e}"
|
| 37 |
+
yield status, None, None
|
| 38 |
+
|
| 39 |
+
# --- GRADIO UI (from your original app.py) ---
|
| 40 |
+
with gr.Blocks() as demo:
|
| 41 |
+
gr.Markdown("<h1 style='text-align:center;'>Vehicle and Pedestrian Tracker</h1>")
|
| 42 |
+
|
| 43 |
+
input_video = gr.Video(label="Upload Video")
|
| 44 |
+
start_btn = gr.Button("Start Tracking")
|
| 45 |
+
status_display = gr.HTML("") # Initially empty
|
| 46 |
+
output_video = gr.Video(label="Processed Video")
|
| 47 |
+
output_json = gr.File(label="Download JSON Output") # Note: This is not hooked up in this version
|
| 48 |
+
|
| 49 |
+
start_btn.click(
|
| 50 |
+
fn=track_video_via_api,
|
| 51 |
+
inputs=input_video,
|
| 52 |
+
outputs=[status_display, output_video, output_json]
|
| 53 |
+
)
|
| 54 |
+
|
| 55 |
+
if __name__ == "__main__":
|
| 56 |
+
demo.launch(share = True)
|