Spaces:
Runtime error
Runtime error
| from flask import Flask, request, jsonify, render_template, send_from_directory | |
| from transformers import ( | |
| AutoModelForSequenceClassification, | |
| AutoTokenizer, | |
| TFCLIPModel, | |
| CLIPProcessor, | |
| pipeline, | |
| BertTokenizer, | |
| BertForSequenceClassification | |
| ) | |
| import cv2 | |
| import os | |
| import subprocess | |
| import torch | |
| from PIL import Image | |
| import numpy as np | |
| import base64 | |
| import uuid | |
| from ultralytics import YOLO | |
| import tensorflow as tf | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # Create directories | |
| os.makedirs('save', exist_ok=True) | |
| os.makedirs('temp', exist_ok=True) | |
| os.makedirs('unsafe_frames', exist_ok=True) | |
| os.makedirs('audio', exist_ok=True) | |
| os.makedirs('logs', exist_ok=True) | |
| os.makedirs('text_output', exist_ok=True) | |
| print("Loading models...") | |
| try: | |
| # Load models | |
| nudity_model = YOLO("Models/nudenet/320n.pt") | |
| bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
| bert_model = BertForSequenceClassification.from_pretrained('bert-base-uncased') | |
| profanity_model = AutoModelForSequenceClassification.from_pretrained("unitary/toxic-bert") | |
| profanity_tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert") | |
| hate_speech_model = AutoModelForSequenceClassification.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") | |
| hate_speech_tokenizer = AutoTokenizer.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") | |
| clip_model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
| clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") | |
| whisper_model = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") | |
| print("All models loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading models: {str(e)}") | |
| raise | |
| def home(): | |
| return render_template('index.html') | |
| def extract_text(): | |
| try: | |
| audio_file = request.form.get('audio_file') | |
| if not audio_file: | |
| return jsonify({"error": "No audio file specified"}), 400 | |
| audio_path = os.path.join('audio', audio_file) | |
| if not os.path.exists(audio_path): | |
| return jsonify({"error": "Audio file not found"}), 404 | |
| # Process audio and get text | |
| audio_result = process_audio(audio_path) | |
| if not audio_result['success']: | |
| return jsonify({"error": audio_result['error']}), 500 | |
| # Save extracted text | |
| text_filename = f"text_{uuid.uuid4().hex}.txt" | |
| text_path = os.path.join('text_output', text_filename) | |
| with open(text_path, 'w', encoding='utf-8') as f: | |
| f.write(audio_result['text']) | |
| # Analyze text content | |
| text_analysis = analyze_text_content(audio_result['text']) | |
| return jsonify({ | |
| "success": True, | |
| "text": audio_result['text'], | |
| "text_file": text_filename, | |
| "confidence": audio_result['confidence'], | |
| "analysis": text_analysis | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error extracting text: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| def serve_audio(filename): | |
| return send_from_directory('audio', filename) | |
| def upload_file(): | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| video = request.files['file'] | |
| if video.filename == '': | |
| return jsonify({"error": "No file selected"}), 400 | |
| video_path = os.path.join('save', video.filename) | |
| video.save(video_path) | |
| try: | |
| frames = extract_frames(video_path) | |
| results = [] | |
| audio_filename = f"audio_{uuid.uuid4().hex}.wav" | |
| audio_path = os.path.join('audio', audio_filename) | |
| audio_result = extract_audio(video_path, audio_path) | |
| if audio_result: | |
| audio_text = process_audio(audio_path) | |
| text_content = audio_text.get('text', '') | |
| # Save extracted text | |
| if text_content: | |
| text_filename = f"text_{uuid.uuid4().hex}.txt" | |
| text_path = os.path.join('text_output', text_filename) | |
| with open(text_path, 'w', encoding='utf-8') as f: | |
| f.write(text_content) | |
| text_analysis = analyze_text_content(text_content) | |
| else: | |
| text_filename = None | |
| text_analysis = None | |
| else: | |
| text_content = '' | |
| text_filename = None | |
| text_analysis = None | |
| batch_size = 15 | |
| for i in range(0, len(frames), batch_size): | |
| batch_frames = frames[i:i + batch_size] | |
| result = analyze_batch(batch_frames, text_content) | |
| if result is None: | |
| continue | |
| results.extend(result) | |
| # Cleanup frames | |
| for frame_data in batch_frames: | |
| if frame_data.get('is_inappropriate', False) or frame_data.get('is_harmful', False): | |
| unique_filename = f'unsafe_{uuid.uuid4().hex}.png' | |
| unsafe_frame_path = os.path.join('unsafe_frames', unique_filename) | |
| os.rename(frame_data['frame'], unsafe_frame_path) | |
| else: | |
| os.remove(frame_data['frame']) | |
| os.remove(frame_data['thumbnail']) | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| if results: | |
| total_meta_score = sum(r['meta_standards']['score'] for r in results) / len(results) | |
| overall_assessment = { | |
| "total_score": total_meta_score, | |
| "risk_level": "High" if total_meta_score > 35 else "Medium" if total_meta_score > 30 else "Low", | |
| "recommendation": get_recommendation(total_meta_score) | |
| } | |
| else: | |
| overall_assessment = { | |
| "total_score": 0, | |
| "risk_level": "Low", | |
| "recommendation": "No issues detected" | |
| } | |
| return jsonify({ | |
| "success": True, | |
| "results": results, | |
| "audio_path": audio_filename, | |
| "audio_text": text_content, | |
| "text_file": text_filename, | |
| "text_analysis": text_analysis, | |
| "overall_assessment": overall_assessment | |
| }) | |
| except Exception as e: | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| logger.error(f"Error in content analysis: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| except Exception as e: | |
| logger.error(f"Error in upload: {str(e)}") | |
| return jsonify({"error": str(e)}), 500 | |
| def extract_frames(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Error opening video file") | |
| frames = [] | |
| frame_count = 0 | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % fps == 0: | |
| frame_path = os.path.join('temp', f'frame_{frame_count}.jpg') | |
| thumbnail_path = os.path.join('temp', f'thumb_{frame_count}.jpg') | |
| cv2.imwrite(frame_path, frame) | |
| thumbnail = cv2.resize(frame, (648, 648)) | |
| cv2.imwrite(thumbnail_path, thumbnail) | |
| frames.append({ | |
| 'frame': frame_path, | |
| 'thumbnail': thumbnail_path, | |
| 'timestamp': frame_count // fps | |
| }) | |
| frame_count += 1 | |
| cap.release() | |
| return frames | |
| def extract_audio(video_path, output_path): | |
| try: | |
| command = [ | |
| 'ffmpeg', | |
| '-i', video_path, | |
| '-vn', | |
| '-acodec', 'pcm_s16le', | |
| '-ar', '16000', | |
| '-ac', '1', | |
| '-y', | |
| output_path | |
| ] | |
| result = subprocess.run( | |
| command, | |
| check=True, | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE | |
| ) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| logger.info(f"Audio extracted successfully: {output_path}") | |
| return output_path | |
| else: | |
| raise Exception("Audio extraction failed - empty or missing file") | |
| except Exception as e: | |
| logger.error(f"Audio extraction error: {str(e)}") | |
| return None | |
| def process_audio(audio_path): | |
| try: | |
| if not os.path.exists(audio_path): | |
| logger.error(f"Audio file not found: {audio_path}") | |
| return { | |
| 'success': False, | |
| 'text': "Audio file not found", | |
| 'error': "File not found" | |
| } | |
| logger.info(f"Processing audio file: {audio_path}") | |
| # First pass with Whisper | |
| whisper_result = whisper_model(audio_path) | |
| logger.info(f"Whisper result: {whisper_result}") | |
| if not whisper_result.get('text'): | |
| logger.error("Whisper failed to extract text") | |
| return { | |
| 'success': False, | |
| 'text': "Whisper failed to extract text", | |
| 'error': "No text found in Whisper output" | |
| } | |
| text = whisper_result['text'] | |
| # Second pass with BERT | |
| chunks = [text[i:i+512] for i in range(0, len(text), 512)] | |
| processed_chunks = [] | |
| for chunk in chunks: | |
| inputs = bert_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = bert_model(**inputs) | |
| processed_chunk = bert_tokenizer.decode( | |
| inputs['input_ids'][0], | |
| skip_special_tokens=True | |
| ) | |
| processed_chunks.append(processed_chunk) | |
| final_text = " ".join(processed_chunks) | |
| return { | |
| 'success': True, | |
| 'text': final_text, | |
| 'confidence': whisper_result.get('confidence', 0) | |
| } | |
| except Exception as e: | |
| logger.error(f"Audio processing error: {str(e)}") | |
| return { | |
| 'success': False, | |
| 'text': "Audio processing failed", | |
| 'error': str(e) | |
| } | |
| def analyze_text_content(text): | |
| try: | |
| # Analyze profanity | |
| profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
| with torch.no_grad(): | |
| profanity_outputs = profanity_model(**profanity_inputs) | |
| profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) | |
| # Analyze hate speech | |
| hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
| with torch.no_grad(): | |
| hate_speech_outputs = hate_speech_model(**hate_speech_inputs) | |
| hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) | |
| return { | |
| "profanity": { | |
| "score": float(profanity_scores[0][1]) * 100, | |
| "is_offensive": float(profanity_scores[0][1]) > 0.5 | |
| }, | |
| "hate_speech": { | |
| "score": float(hate_speech_scores[0][1]) * 100, | |
| "is_hateful": float(hate_speech_scores[0][1]) > 0.5 | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error analyzing text: {str(e)}") | |
| return None | |
| def analyze_batch(batch_frames, text): | |
| try: | |
| results = [] | |
| images = [] | |
| timestamps = [] | |
| for frame_data in batch_frames: | |
| image = Image.open(frame_data['frame']) | |
| image = image.resize((128, 128)) | |
| images.append(image) | |
| timestamps.append(frame_data['timestamp']) | |
| # Prepare image data | |
| image_arrays = np.array([np.array(img) / 255.0 for img in images]) | |
| image_tensors = torch.tensor(image_arrays).permute(0, 3, 1, 2).float() | |
| # Run analyses | |
| with torch.no_grad(): | |
| nudity_results = nudity_model(image_tensors) | |
| nudity_predictions = [result.boxes for result in nudity_results] | |
| if text: | |
| profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
| with torch.no_grad(): | |
| profanity_outputs = profanity_model(**profanity_inputs) | |
| profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) | |
| hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
| with torch.no_grad(): | |
| hate_speech_outputs = hate_speech_model(**hate_speech_inputs) | |
| hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) | |
| else: | |
| profanity_scores = torch.zeros(1, 2) | |
| hate_speech_scores = torch.zeros(1, 2) | |
| clip_inputs = clip_processor(text=[text if text else ""] * len(images), images=images, return_tensors="tf", padding=True) | |
| clip_outputs = clip_model(**clip_inputs) | |
| clip_scores = tf.nn.softmax(clip_outputs.logits_per_image, axis=-1).numpy() | |
| for i in range(len(batch_frames)): | |
| with open(batch_frames[i]['thumbnail'], 'rb') as img_file: | |
| thumbnail_base64 = base64.b64encode(img_file.read()).decode('utf-8') | |
| nudity_score = float(nudity_predictions[i].conf[0]) * 99 if nudity_predictions[i] else 0.0 | |
| profanity_score = float(profanity_scores[0][1]) * 99 | |
| hate_speech_score = float(hate_speech_scores[0][1]) * 99 | |
| harm_score = float(clip_scores[i][1]) * 99 if clip_scores[i].size > 1 else 0.0 | |
| meta_standards_score = ( | |
| (nudity_score * 0.6) + | |
| (hate_speech_score * 0.1) + | |
| (harm_score * 0.1) + | |
| (profanity_score * 0.1) | |
| ) | |
| results.append({ | |
| "nudity": { | |
| "score": nudity_score, | |
| "is_inappropriate": nudity_score > 65 | |
| }, | |
| "profanity": { | |
| "score": profanity_score, | |
| "is_offensive": profanity_score > 65 | |
| }, | |
| "hate_speech": { | |
| "score": hate_speech_score, | |
| "is_hateful": hate_speech_score > 40 | |
| }, | |
| "harm": { | |
| "score": harm_score, | |
| "is_harmful": harm_score > 40 | |
| }, | |
| "meta_standards": { | |
| "score": meta_standards_score, | |
| "is_violating": meta_standards_score > 30, | |
| "risk_level": "High" if meta_standards_score > 60 else "Medium" if meta_standards_score > 25 else "Low", | |
| "recommendation": get_recommendation(meta_standards_score) | |
| }, | |
| "thumbnail": thumbnail_base64, | |
| "timestamp": timestamps[i] | |
| }) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in batch analysis: {str(e)}") | |
| return None | |
| def get_recommendation(score): | |
| if score > 70: | |
| return "Content likely violates Meta Community Standards. Major modifications needed." | |
| elif score > 30: | |
| return "Content may need modifications to comply with Meta Community Standards." | |
| else: | |
| return "Content likely complies with Meta Community Standards." | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=5000, debug=True) |