Sankie005's picture
Upload 434 files
c446951
import cv2
import pandas as pd
import pickle
import requests
import matplotlib.pyplot as plt
import argparse
import os
def parse_args():
parser = argparse.ArgumentParser(description="Process video and extract insights")
parser.add_argument("--dataset_id", help="Dataset ID (required)")
parser.add_argument("--version_id", default="1", help="Version ID (default: 1)")
parser.add_argument("--api_key", help="API key (required)")
parser.add_argument("--video_path", help="Path to the video (required)")
parser.add_argument(
"--interval_minutes",
type=int,
default=1,
help="Interval in seconds (default: 60)",
)
return parser.parse_args()
def extract_frames(video_path, interval_minutes):
cap = cv2.VideoCapture(video_path)
frames = []
timestamps = []
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % (fps * interval_minutes) == 0:
frames.append(frame)
timestamps.append(frame_count / fps)
frame_count += 1
cap.release()
return frames, timestamps
def fetch_predictions(
base_url, frames, timestamps, dataset_id, version_id, api_key, confidence=0.5
):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
df_rows = []
for idx, frame in enumerate(frames):
numpy_data = pickle.dumps(frame)
res = requests.post(
f"{base_url}/{dataset_id}/{version_id}",
data=numpy_data,
headers=headers,
params={
"api_key": api_key,
"confidence": confidence,
"image_type": "numpy",
},
)
predictions = res.json()
for pred in predictions["predictions"]:
time_interval = (
f"{int(timestamps[idx] // 60)}:{int(timestamps[idx] % 60):02}"
)
row = {
"timestamp": time_interval,
"time": predictions["time"],
"x": pred["x"],
"y": pred["y"],
"width": pred["width"],
"height": pred["height"],
"pred_confidence": pred["confidence"],
"class": pred["class"],
}
df_rows.append(row)
df = pd.DataFrame(df_rows)
df["seconds"] = (
df["timestamp"].str.split(":").apply(lambda x: int(x[0]) * 60 + int(x[1]))
)
df = df.sort_values(by="seconds")
return df
def plot_and_save(
data,
title,
filename,
ylabel,
stacked=False,
legend_title=None,
legend_loc=None,
legend_bbox=None,
):
plt.style.use("dark_background")
data.plot(kind="bar", stacked=stacked, figsize=(15, 7))
plt.title(title)
plt.ylabel(ylabel)
plt.xlabel("Timestamp (in minutes:seconds)")
if legend_title:
plt.legend(title=legend_title, loc=legend_loc, bbox_to_anchor=legend_bbox)
plt.tight_layout()
plt.savefig(filename)
def main():
args = parse_args()
base_url = "http://localhost:9001"
video_path = args.video_path
dataset_id = args.dataset_id
version_id = args.version_id
api_key = args.api_key
interval_minutes = args.interval_minutes * 60
frames, timestamps = extract_frames(video_path, interval_minutes)
df = fetch_predictions(
base_url, frames, timestamps, dataset_id, version_id, api_key
)
if not os.path.exists("results"):
os.makedirs("results")
# saving predictions response to csv
df.to_csv("results/predictions.csv", index=False)
# Transform timestamps to minutes and group
df["minutes"] = (
df["timestamp"].str.split(":").apply(lambda x: int(x[0]) * 60 + int(x[1]))
)
object_counts_per_interval = df.groupby("minutes").size().sort_index()
object_counts_per_interval.index = object_counts_per_interval.index.map(
lambda x: f"{x // 60}:{x % 60:02}"
)
object_counts_per_interval.to_csv("results/object_counts_per_interval.csv")
# Quick insights
print(f"Total unique objects detected: {df['class'].nunique()}")
print(f"Most frequently detected object: {df['class'].value_counts().idxmax()}")
print(
f"Time interval with the most objects detected: {object_counts_per_interval.idxmax()}"
)
print(
f"Time interval with the least objects detected: {object_counts_per_interval.idxmin()}"
)
plot_and_save(
object_counts_per_interval,
"Number of Objects Detected Over Time",
"results/objects_over_time_d.png",
"Number of Objects",
)
# Group by timestamp and class, then sort by minutes
objects_by_class_per_interval = (
df.groupby(["minutes", "class"]).size().unstack(fill_value=0).sort_index()
)
objects_by_class_per_interval.index = objects_by_class_per_interval.index.map(
lambda x: f"{x // 60}:{x % 60:02}"
)
objects_by_class_per_interval.to_csv(
"results/object_counts_by_class_per_interval.csv"
)
plot_and_save(
objects_by_class_per_interval,
"Number of Objects Detected Over Time by Class",
"results/objects_by_class_over_time.png",
"Number of Objects",
True,
"Object Class",
"center left",
(1, 0.5),
)
if __name__ == "__main__":
main()