Spaces:
Sleeping
Sleeping
File size: 5,050 Bytes
282aec6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | """
LangChain YOLO Agent
---------------------
This project provides a YOLO-based object detection tool integrated with LangChain.
Users can upload any video to analyze its contents, generate object detection logs,
and visualize detections with bounding boxes.
Steps:
1) Install dependencies: `pip install langchain openai ultralytics opencv-python`
2) Add this file (`yolo_agent.py`) to your project.
3) Ensure that the YOLO model file (`last.pt`) is available in the working directory.
4) Use the provided functions to analyze uploaded videos dynamically.
"""
import os
import cv2
import shutil
from langchain.agents import Tool, tool
from ultralytics import YOLO
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Update the model path to point to the "yolo" directory
MODEL_PATH = os.path.join(os.path.dirname(__file__), "last.pt")
def detect_with_yolo(
video_path: str,
model_path: str = os.path.join(os.path.dirname(__file__), "last.pt"),
output_dir: str = "detections",
frame_skip: int = 10,
conf: float = 0.7
) -> str:
"""
Runs YOLO detection on the given video.
- Detects only class_id 0..5 (Danger / Handgun / Knife, etc.)
- Draws red bounding boxes
- Saves logs to a text file
- Saves detected frames as images
"""
if not os.path.exists(video_path):
return f"Video not found: {video_path}"
try:
model = YOLO(model_path)
except Exception as e:
return f"Failed to load model: {e}"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return f"Cannot open video: {video_path}"
os.makedirs(output_dir, exist_ok=True)
output_txt = os.path.join(output_dir, "detections.txt")
frame_count = 0
with open(output_txt, "w") as ftxt:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model(frame, conf=0.8)
detections = (
results[0].boxes.data.cpu().numpy() if len(results) > 0 else []
)
valid_detections = [det for det in detections if int(det[5]) in [0, 1, 2, 3, 4, 5]]
if len(valid_detections) > 0:
for det in valid_detections:
x1, y1, x2, y2, conf, cls_ = det
class_id = int(cls_)
if class_id in [0, 1, 2]:
class_label = "Danger"
elif class_id in [3, 4, 5]:
class_label = model.names.get(class_id, f"Class {class_id}")
else:
class_label = f"Class {class_id}"
cv2.rectangle(
frame,
(int(x1), int(y1)),
(int(x2), int(y2)),
(0, 0, 255),
3,
)
(w, h), _ = cv2.getTextSize(class_label, cv2.FONT_HERSHEY_COMPLEX, 0.8, 2)
label_x1 = int(x1)
label_y2 = int(y1)
label_y1 = label_y2 - h - 10
label_x2 = label_x1 + w + 10
cv2.rectangle(
frame,
(label_x1, label_y1),
(label_x2, label_y2),
(0, 0, 255),
cv2.FILLED,
)
cv2.putText(
frame,
class_label,
(label_x1 + 5, label_y1 + h + 5),
cv2.FONT_HERSHEY_COMPLEX,
0.85,
(255, 255, 255),
2,
cv2.LINE_AA,
)
ftxt.write(
f"Frame {frame_count}: {class_label} at ({int(x1)}, {int(y1)}, {int(x2)}, {int(y2)})\n"
)
output_frame_path = os.path.join(output_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(output_frame_path, frame)
frame_count += frame_skip
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count)
cap.release()
cv2.destroyAllWindows()
return f"Processing complete. Outputs saved in '{output_dir}' and '{output_txt}'."
@tool("video_detection_tool", return_direct=True)
def video_detection_tool(video) -> str:
"""
Handles video uploads dynamically and runs YOLO detection.
Saves results in the "detections" folder and logs to detections.txt.
"""
video_path = os.path.join(UPLOAD_FOLDER, os.path.basename(video.name))
# Update the model path used here too
MODEL_PATH = os.path.join(os.path.dirname(__file__), "last.pt")
shutil.copy(video.name, video_path)
try:
result = detect_with_yolo(video_path)
except Exception as e:
return f"Error during detection: {e}"
return result
if __name__ == "__main__":
print("LangChain YOLO Agent Ready!")
__all__ = ["video_detection_tool"]
|