| import cv2 |
| import tempfile |
| import requests |
| import os |
| from PIL import Image |
| from transformers import pipeline |
|
|
| classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection") |
|
|
|
|
| def download_file(url): |
| response = requests.get(url, stream=True) |
| tmp = tempfile.NamedTemporaryFile(delete=False) |
| for chunk in response.iter_content(1024): |
| tmp.write(chunk) |
| tmp.close() |
| return tmp.name |
|
|
|
|
| def get_video_duration(video_path): |
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| cap.release() |
| return frame_count / fps if fps > 0 else 0 |
|
|
|
|
| def extract_frame(video_path, second): |
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_number = int(fps * second) |
|
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| success, frame = cap.read() |
| cap.release() |
|
|
| if not success: |
| return None |
|
|
| tmp_file = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) |
| cv2.imwrite(tmp_file.name, frame) |
| return tmp_file.name |
|
|
|
|
| def get_frame_times(duration, file_size_mb): |
| if duration <= 2: |
| return [1, 2] |
|
|
| elif duration <= 10: |
| return [2] |
|
|
| elif duration <= 15: |
| return [4, 9, 13] |
|
|
| if file_size_mb > 14: |
| return [4, 9, 13] |
|
|
| return [2] |
|
|
|
|
| def check_image_nsfw(image_path): |
| image = Image.open(image_path).convert("RGB") |
| result = classifier(image) |
|
|
| for r in result: |
| if r["label"] == "nsfw" and r["score"] > 0.5: |
| return True |
|
|
| return False |
|
|
|
|
| def check_video_nsfw(video_path): |
| size_mb = os.path.getsize(video_path) / (1024 * 1024) |
| duration = get_video_duration(video_path) |
|
|
| times = get_frame_times(duration, size_mb) |
|
|
| for t in times: |
| frame = extract_frame(video_path, t) |
| if frame: |
| if check_image_nsfw(frame): |
| return True |
|
|
| return False |