| | import os |
| | import requests |
| | from dotenv import load_dotenv |
| | import boto3 |
| | import supabase |
| | import cv2 |
| |
|
| | R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY') |
| | R2_SECRET_KEY = os.getenv('R2_SECRET_KEY') |
| | R2_BUCKET_NAME = os.getenv('R2_BUCKET_NAME') |
| | R2_ENDPOINT_URL = os.getenv('R2_ENDPOINT_URL') |
| |
|
| | def download_video(video_url): |
| | if not os.path.exists('./input'): |
| | os.makedirs('./input') |
| | print(f'Downloading video from {video_url}') |
| | response = requests.get(video_url, stream=True) |
| | if response.status_code == 200: |
| | video_name = video_url.split('/')[-1] |
| | print(video_name) |
| | video_path = f'./input/{video_name}.mp4' |
| | print(video_path) |
| | with open(video_path, 'wb') as f: |
| | for chunk in response.iter_content(chunk_size=8192): |
| | f.write(chunk) |
| | return video_path |
| | else: |
| | raise Exception(f"Failed to download video: {response.status_code}") |
| |
|
| | def download_file(url, path): |
| | if not os.path.exists(path): |
| | os.makedirs(path) |
| | print(f'Downloading file from {url} to {path}') |
| | response = requests.get(url, stream=True) |
| | if response.status_code == 200: |
| | file_name = url.split('/')[-1] |
| | file_path = f'./{path}/{file_name}.mp4' |
| | with open(file_path, 'wb') as f: |
| | for chunk in response.iter_content(chunk_size=8192): |
| | f.write(chunk) |
| | return file_path |
| | else: |
| | raise Exception(f"Failed to download file: {response.status_code}") |
| | |
| |
|
| | def upload_file(file_path, bucket_name, object_name, endpoint_url, access_key, secret_key): |
| | s3 = boto3.client('s3', endpoint_url=endpoint_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key) |
| | try: |
| | response =s3.upload_file(file_path, bucket_name, object_name) |
| | print(f'{file_path} uploaded to {bucket_name}/{object_name}') |
| | return response |
| | except Exception as e: |
| | print(f'Error uploading file: {e}') |
| |
|
| |
|
| | def detect_faces_frames(video_url): |
| | video_name = video_url.split('/')[-1] |
| | print(video_name) |
| | video_path = download_video(video_url) |
| | |
| | frames =[] |
| | |
| | face_cascade = cv2.CascadeClassifier('./utils/face_detection.xml') |
| |
|
| | |
| | cap = cv2.VideoCapture(video_path) |
| |
|
| | |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | duration = total_frames / fps |
| |
|
| | frame_count = 0 |
| | time_count = 0 |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| |
|
| | |
| | if frame_count % int(fps * 5) == 0: |
| | |
| | gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| |
|
| | |
| | faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) |
| |
|
| | |
| | for (x, y, w, h) in faces: |
| | cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2) |
| |
|
| | |
| | frame_name = f"./output/{video_name}_{time_count}.jpg" |
| | print(frame_name) |
| | frames.append(frame_name) |
| | cv2.imwrite(f"./output/{video_name}_{time_count}.jpg", frame) |
| | time_count += 1 |
| |
|
| | frame_count += 1 |
| |
|
| | cap.release() |
| | cv2.destroyAllWindows() |
| |
|
| | print(f"Total video duration: {duration:.2f} seconds") |
| | print(f"Total frames processed: {time_count // 5}") |
| |
|
| | res = [] |
| | for frame in frames: |
| | upload_file(f'{frame}', 'outputs', frame.split('/')[-1] , 'https://c98643a1da5e9aa06b27b8bb7eb9227a.r2.cloudflarestorage.com/warden-ai', R2_ACCESS_KEY, R2_SECRET_KEY) |
| | res.append(f'https://pub-08a118f4cb7c4b208b55e6877b0bacca.r2.dev/outputs/{frame.split("/")[-1]}') |
| | |
| | return res |
| |
|
| |
|