Spaces:
Sleeping
Sleeping
File size: 2,999 Bytes
a954bca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | import os
import json
import tempfile
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import FileResponse
from ultralytics import YOLO
import supervision as sv
# --- MODEL AND APP INITIALIZATION ---
# The model is loaded only ONCE when the server starts, making it fast.
MODEL_PATH = "best.pt"
model = YOLO(MODEL_PATH)
app = FastAPI(title="YOLOv8 Tracking API")
# --- YOUR EXISTING PROCESSING LOGIC (UNCHANGED) ---
# We just put your code inside this function.
def process_video_logic(input_path, output_path, json_path):
tracker = sv.ByteTrack()
box_annotator = sv.BoxAnnotator(thickness=5)
label_annotator = sv.LabelAnnotator(text_position=sv.Position.TOP_CENTER, text_scale=1, text_thickness=1)
frame_generator = sv.get_video_frames_generator(source_path=input_path)
video_info = sv.VideoInfo.from_video_path(input_path)
results_list = []
with sv.VideoSink(target_path=output_path, video_info=video_info) as sink:
for frame_number, frame in enumerate(frame_generator):
results = model(frame, verbose=False)[0]
detections = sv.Detections.from_ultralytics(results)
tracked_detections = tracker.update_with_detections(detections=detections)
labels = [f"ID: {det[4]} {model.model.names[int(det[3])]}" for det in tracked_detections]
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=tracked_detections)
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=tracked_detections, labels=labels)
for det in tracked_detections:
bbox, conf, class_id, tracker_id = det[0], det[2], int(det[3]), det[4]
results_list.append({
"frame_number": frame_number,
"track_id": int(tracker_id),
"class": model.model.names[class_id],
"confidence": float(conf),
"bounding_box": [int(coord) for coord in bbox]
})
sink.write_frame(frame=annotated_frame)
with open(json_path, 'w') as f:
json.dump(results_list, f, indent=4)
# --- API ENDPOINT ---
@app.post("/track/")
async def track_video_endpoint(video: UploadFile = File(...)):
# Use a temporary directory to handle file operations safely
with tempfile.TemporaryDirectory() as temp_dir:
input_path = os.path.join(temp_dir, video.filename)
output_video_path = os.path.join(temp_dir, "output.mp4")
output_json_path = os.path.join(temp_dir, "results.json")
# Save the uploaded video file
with open(input_path, "wb") as buffer:
buffer.write(await video.read())
# Run your existing processing logic
process_video_logic(input_path, output_video_path, output_json_path)
# Return the processed video as a downloadable file
return FileResponse(output_video_path, media_type="video/mp4", filename="output.mp4")
|