Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import tempfile | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import FileResponse | |
| from ultralytics import YOLO | |
| import supervision as sv | |
| # --- MODEL AND APP INITIALIZATION --- | |
| # The model is loaded only ONCE when the server starts, making it fast. | |
| MODEL_PATH = "best.pt" | |
| model = YOLO(MODEL_PATH) | |
| app = FastAPI(title="YOLOv8 Tracking API") | |
| # --- YOUR EXISTING PROCESSING LOGIC (UNCHANGED) --- | |
| # We just put your code inside this function. | |
| def process_video_logic(input_path, output_path, json_path): | |
| tracker = sv.ByteTrack() | |
| box_annotator = sv.BoxAnnotator(thickness=5) | |
| label_annotator = sv.LabelAnnotator(text_position=sv.Position.TOP_CENTER, text_scale=1, text_thickness=1) | |
| frame_generator = sv.get_video_frames_generator(source_path=input_path) | |
| video_info = sv.VideoInfo.from_video_path(input_path) | |
| results_list = [] | |
| with sv.VideoSink(target_path=output_path, video_info=video_info) as sink: | |
| for frame_number, frame in enumerate(frame_generator): | |
| results = model(frame, verbose=False)[0] | |
| detections = sv.Detections.from_ultralytics(results) | |
| tracked_detections = tracker.update_with_detections(detections=detections) | |
| labels = [f"ID: {det[4]} {model.model.names[int(det[3])]}" for det in tracked_detections] | |
| annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=tracked_detections) | |
| annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=tracked_detections, labels=labels) | |
| for det in tracked_detections: | |
| bbox, conf, class_id, tracker_id = det[0], det[2], int(det[3]), det[4] | |
| results_list.append({ | |
| "frame_number": frame_number, | |
| "track_id": int(tracker_id), | |
| "class": model.model.names[class_id], | |
| "confidence": float(conf), | |
| "bounding_box": [int(coord) for coord in bbox] | |
| }) | |
| sink.write_frame(frame=annotated_frame) | |
| with open(json_path, 'w') as f: | |
| json.dump(results_list, f, indent=4) | |
| # --- API ENDPOINT --- | |
| async def track_video_endpoint(video: UploadFile = File(...)): | |
| # Use a temporary directory to handle file operations safely | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| input_path = os.path.join(temp_dir, video.filename) | |
| output_video_path = os.path.join(temp_dir, "output.mp4") | |
| output_json_path = os.path.join(temp_dir, "results.json") | |
| # Save the uploaded video file | |
| with open(input_path, "wb") as buffer: | |
| buffer.write(await video.read()) | |
| # Run your existing processing logic | |
| process_video_logic(input_path, output_video_path, output_json_path) | |
| # Return the processed video as a downloadable file | |
| return FileResponse(output_video_path, media_type="video/mp4", filename="output.mp4") | |