import os import cv2 import mediapipe as mp import pandas as pd import numpy as np import logging import subprocess import tempfile from flask import Flask, render_template, request, jsonify from werkzeug.utils import secure_filename import atexit import shutil # Configuración de logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Inicializa la aplicación de Flask app = Flask(__name__) # Configuración de Flask app.config['SECRET_KEY'] = 'facial-analyzer-demo-2024-super-secret-key-12345' app.config['WTF_CSRF_ENABLED'] = False # Deshabilitar CSRF para este ejemplo # Configura las rutas para guardar los archivos temporales UPLOAD_FOLDER = 'temp_files' if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max # Inicializa MediaPipe Face Mesh mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) def download_youtube_video(youtube_url, output_path): """ Descarga video de YouTube usando múltiples métodos (PyTube, yt-dlp). Args: youtube_url (str): URL del video de YouTube. output_path (str): Carpeta donde guardar el video. Returns: tuple: (video_path, video_info) o (None, error_message). """ logger.info(f"Intentando descargar: {youtube_url}") # Convertir URLs de Shorts a formato normal if '/shorts/' in youtube_url: video_id = youtube_url.split('/shorts/')[-1].split('?')[0] youtube_url = f"https://www.youtube.com/watch?v={video_id}" logger.info(f"Convertido de Shorts a: {youtube_url}") # Método 1: Intentar con PyTube try: # Intentar importar PyTube (múltiples versiones) try: from pytubefix import YouTube, exceptions logger.info("📦 Usando PyTubeFix") except ImportError: from pytube import YouTube, exceptions logger.info("📦 Usando PyTube estándar") logger.info("Probando método 1: PyTube") yt = YouTube( youtube_url, use_oauth=False, allow_oauth_cache=False ) # Obtener información del video video_info = { 'title': yt.title, 'duration': yt.length, 'author': yt.author } logger.info(f"Video encontrado: {video_info['title']} ({video_info['duration']}s)") # Obtener stream stream = None streams = yt.streams.filter(progressive=True, file_extension='mp4') if streams: stream = streams.order_by('resolution').desc().first() if not stream: streams = yt.streams.filter(adaptive=True, file_extension='mp4', only_video=True) if streams: stream = streams.order_by('resolution').desc().first() if not stream: raise Exception("No se encontraron streams compatibles") # Descargar video_id = yt.video_id safe_filename = secure_filename(f"{video_id}.mp4") video_path = os.path.join(output_path, safe_filename) stream.download(output_path=output_path, filename=safe_filename) if os.path.exists(video_path): logger.info("✅ Descarga exitosa con PyTube") return video_path, video_info else: raise Exception("Archivo no se creó correctamente") except Exception as e: logger.warning(f"PyTube falló: {e}") # Método 2: Intentar con yt-dlp (más robusto) try: logger.info("Probando método 2: yt-dlp") # Extraer ID del video para nombre de archivo if 'watch?v=' in youtube_url: video_id = youtube_url.split('watch?v=')[-1].split('&')[0] elif 'youtu.be/' in youtube_url: video_id = youtube_url.split('youtu.be/')[-1].split('?')[0] else: video_id = 'video' safe_filename = secure_filename(f"{video_id}.mp4") video_path = os.path.join(output_path, safe_filename) # Comando yt-dlp cmd = [ 'yt-dlp', '--format', 'mp4[height<=720]/best[height<=720]/best', '--output', video_path, '--no-playlist', youtube_url ] logger.info(f"Ejecutando: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode == 0 and os.path.exists(video_path): # Obtener información del video info_cmd = [ 'yt-dlp', '--print', 'title', '--print', 'duration', '--print', 'uploader', '--no-download', youtube_url ] info_result = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30) if info_result.returncode == 0: lines = info_result.stdout.strip().split('\n') video_info = { 'title': lines[0] if len(lines) > 0 else 'Título desconocido', 'duration': int(float(lines[1])) if len(lines) > 1 and lines[1].replace('.', '').isdigit() else 0, 'author': lines[2] if len(lines) > 2 else 'Autor desconocido' } else: video_info = { 'title': 'Video de YouTube', 'duration': 0, 'author': 'Desconocido' } logger.info("✅ Descarga exitosa con yt-dlp") return video_path, video_info else: raise Exception(f"yt-dlp falló: {result.stderr}") except subprocess.TimeoutExpired: logger.warning("yt-dlp timeout") except FileNotFoundError: logger.warning("yt-dlp no está instalado") except Exception as e: logger.warning(f"yt-dlp falló: {e}") # Si todos los métodos fallan return None, "No se pudo descargar el video. Verifica que la URL sea correcta y el video sea público." # Puntos clave específicos que necesitamos (optimización) REQUIRED_LANDMARKS = { 'mouth': [13, 14, 61, 291], 'eyes': [386, 374, 362, 263], 'head': [4, 152, 234, 454], 'eyebrows': [107, 336, 17, 268] # Añadimos puntos para cejas } def cleanup_temp_files(): """Limpia archivos temporales al cerrar la aplicación.""" if os.path.exists(UPLOAD_FOLDER): try: shutil.rmtree(UPLOAD_FOLDER) logger.info("Archivos temporales limpiados") except Exception as e: logger.error(f"Error limpiando archivos temporales: {e}") atexit.register(cleanup_temp_files) def get_video_info(video_path): """ Obtiene información básica del video como FPS y duración. Args: video_path (str): Ruta al archivo de video. Returns: tuple: (fps, frame_count, duration) """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, None, None fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps if fps > 0 else 0 cap.release() return fps, frame_count, duration def extract_required_landmarks(landmarks): """ Extrae solo los landmarks necesarios para optimizar memoria. Args: landmarks: Landmarks de MediaPipe. Returns: dict: Diccionario con coordenadas de puntos específicos. """ result = {} all_points = [] for category, indices in REQUIRED_LANDMARKS.items(): all_points.extend(indices) for i in all_points: if i < len(landmarks.landmark): landmark = landmarks.landmark[i] result[f'x_{i}'] = landmark.x result[f'y_{i}'] = landmark.y return result def get_video_landmarks(video_path): """ Procesa un video y extrae los puntos faciales clave (optimizado). Args: video_path (str): Ruta al archivo de video. Returns: pd.DataFrame: Un DataFrame de pandas con los puntos clave por fotograma. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"No se pudo abrir el video: {video_path}") return None # Obtener información del video fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) logger.info(f"Procesando video: {total_frames} frames a {fps} FPS") data = [] frame_count = 0 processed_frames = 0 # Procesar cada frame while cap.isOpened(): success, image = cap.read() if not success: break # Preprocesar imagen image.flags.writeable = False image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) results = face_mesh.process(image) # Extraer datos del frame row_data = {'frame': frame_count, 'fps': fps} if results.multi_face_landmarks: landmarks_data = extract_required_landmarks(results.multi_face_landmarks[0]) row_data.update(landmarks_data) processed_frames += 1 else: # Llenar con NaN si no se detecta cara for category, indices in REQUIRED_LANDMARKS.items(): for i in indices: row_data[f'x_{i}'] = np.nan row_data[f'y_{i}'] = np.nan data.append(row_data) frame_count += 1 # Log progreso cada 100 frames if frame_count % 100 == 0: logger.info(f"Procesados {frame_count}/{total_frames} frames") cap.release() if processed_frames == 0: logger.warning("No se detectaron caras en ningún frame") return None logger.info(f"Análisis completado: {processed_frames}/{frame_count} frames con detección facial") return pd.DataFrame(data) def calculate_metrics(df): """ Calcula las métricas de comportamiento a partir del DataFrame de puntos. Args: df (pd.DataFrame): DataFrame con los puntos faciales por fotograma. Returns: dict: Diccionario con todas las métricas calculadas. """ if df.empty or 'fps' not in df.columns: return {'error': 'DataFrame vacío o sin información de FPS'} total_frames = len(df) video_fps = df['fps'].iloc[0] if not df['fps'].isna().all() else 30 total_video_time_seconds = total_frames / video_fps if total_frames > 0 else 0 # Filtrar frames válidos (con detección facial) valid_frames = df.dropna() if valid_frames.empty: return {'error': 'No hay frames válidos con detección facial'} # === ANÁLISIS DE SONRISAS === try: y1, y2 = valid_frames['y_13'], valid_frames['y_14'] x1, x2 = valid_frames['x_61'], valid_frames['x_291'] # Calcular MAR (Mouth Aspect Ratio) distancia_vertical = np.abs(y1 - y2) distancia_horizontal = np.abs(x1 - x2) mar = distancia_vertical / (distancia_horizontal + 1e-6) # Detectar sonrisas smiles = [] is_smiling = False start_frame = 0 smile_threshold = 0.2 for i, (idx, row) in enumerate(valid_frames.iterrows()): current_mar = mar.iloc[i] current_frame = row['frame'] if current_mar > smile_threshold and not is_smiling: is_smiling = True start_frame = current_frame elif current_mar <= smile_threshold and is_smiling: is_smiling = False end_frame = current_frame duration = (end_frame - start_frame) / video_fps if duration > 0.1: # Filtrar sonrisas muy cortas smiles.append(duration) total_smiles = len(smiles) avg_smile_duration = np.mean(smiles) if smiles else 0 total_smile_time = sum(smiles) if smiles else 0 except Exception as e: logger.error(f"Error en análisis de sonrisas: {e}") total_smiles = 0 avg_smile_duration = 0 total_smile_time = 0 # === ANÁLISIS DE PARPADEOS === try: # Calcular EAR (Eye Aspect Ratio) dist_vert = np.abs(valid_frames['y_386'] - valid_frames['y_374']) dist_horiz = np.abs(valid_frames['x_362'] - valid_frames['x_263']) ear = dist_vert / (dist_horiz + 1e-6) # Detectar parpadeos blinks = 0 blink_threshold = 0.25 min_blink_frames = 2 blinks_consecutive_frames = 0 for value in ear: if value < blink_threshold: blinks_consecutive_frames += 1 else: if blinks_consecutive_frames >= min_blink_frames: blinks += 1 blinks_consecutive_frames = 0 blink_rate_per_minute = (blinks / total_video_time_seconds) * 60 if total_video_time_seconds > 0 else 0 except Exception as e: logger.error(f"Error en análisis de parpadeos: {e}") blinks = 0 blink_rate_per_minute = 0 # === ANÁLISIS DE MOVIMIENTO DE CABEZA === try: movimiento_vertical_cabeza = valid_frames['y_4'] - valid_frames['y_152'] movimiento_horizontal_cabeza = valid_frames['x_234'] - valid_frames['x_454'] std_vertical_head = np.std(movimiento_vertical_cabeza) std_horizontal_head = np.std(movimiento_horizontal_cabeza) # Calcular movimiento total total_head_movement = np.sqrt(std_vertical_head**2 + std_horizontal_head**2) except Exception as e: logger.error(f"Error en análisis de movimiento de cabeza: {e}") std_vertical_head = 0 std_horizontal_head = 0 total_head_movement = 0 # === ANÁLISIS DE EMOCIONES NEGATIVAS === try: # Cejas (diferencia vertical entre 107 y 336 para enojo/furioso) brow_diff = np.abs(valid_frames['y_107'] - valid_frames['y_336']) # Boca (usamos MAR ajustado para tensión) mar_tension = mar # Reusamos MAR pero con umbral diferente # Gruñón (diferencia vertical entre 17 y 268, boca neutral) mouth_neutral = np.abs(valid_frames['y_17'] - valid_frames['y_268']) # Detectar enojado (cejas fruncidas + boca tensa) angry_events = [] is_angry = False start_frame = 0 angry_brow_threshold = 0.05 # Umbral para cejas fruncidas angry_mar_threshold = 0.15 # Umbral para boca tensa for i, (idx, row) in enumerate(valid_frames.iterrows()): current_brow = brow_diff.iloc[i] current_mar = mar_tension.iloc[i] current_frame = row['frame'] if current_brow < angry_brow_threshold and current_mar < angry_mar_threshold and not is_angry: is_angry = True start_frame = current_frame elif (current_brow >= angry_brow_threshold or current_mar >= angry_mar_threshold) and is_angry: is_angry = False end_frame = current_frame duration = (end_frame - start_frame) / video_fps if duration > 0.1: angry_events.append(duration) total_angry = len(angry_events) avg_angry_duration = np.mean(angry_events) if angry_events else 0 total_angry_time = sum(angry_events) if angry_events else 0 # Detectar gruñón (cejas ligeramente fruncidas + boca neutral) grumpy_events = [] is_grumpy = False start_frame = 0 grumpy_brow_threshold = 0.08 # Umbral más suave grumpy_mouth_threshold = 0.03 # Diferencia vertical baja para neutral for i, (idx, row) in enumerate(valid_frames.iterrows()): current_brow = brow_diff.iloc[i] current_mouth = mouth_neutral.iloc[i] current_frame = row['frame'] if current_brow < grumpy_brow_threshold and current_mouth < grumpy_mouth_threshold and not is_grumpy: is_grumpy = True start_frame = current_frame elif (current_brow >= grumpy_brow_threshold or current_mouth >= grumpy_mouth_threshold) and is_grumpy: is_grumpy = False end_frame = current_frame duration = (end_frame - start_frame) / video_fps if duration > 0.1: grumpy_events.append(duration) total_grumpy = len(grumpy_events) avg_grumpy_duration = np.mean(grumpy_events) if grumpy_events else 0 total_grumpy_time = sum(grumpy_events) if grumpy_events else 0 # Detectar furioso (cejas muy fruncidas + boca tensa intensa) furious_events = [] is_furious = False start_frame = 0 furious_brow_threshold = 0.03 # Umbral más estricto furious_mar_threshold = 0.10 # Umbral más bajo para tensión fuerte for i, (idx, row) in enumerate(valid_frames.iterrows()): current_brow = brow_diff.iloc[i] current_mar = mar_tension.iloc[i] current_frame = row['frame'] if current_brow < furious_brow_threshold and current_mar < furious_mar_threshold and not is_furious: is_furious = True start_frame = current_frame elif (current_brow >= furious_brow_threshold or current_mar >= furious_mar_threshold) and is_furious: is_furious = False end_frame = current_frame duration = (end_frame - start_frame) / video_fps if duration > 0.1: furious_events.append(duration) total_furious = len(furious_events) avg_furious_duration = np.mean(furious_events) if furious_events else 0 total_furious_time = sum(furious_events) if furious_events else 0 except Exception as e: logger.error(f"Error en análisis de emociones negativas: {e}") total_angry = 0 avg_angry_duration = 0 total_angry_time = 0 total_grumpy = 0 avg_grumpy_duration = 0 total_grumpy_time = 0 total_furious = 0 avg_furious_duration = 0 total_furious_time = 0 # === MÉTRICAS ADICIONALES === face_detection_rate = len(valid_frames) / total_frames * 100 return { 'video_info': { 'duracion_total': round(total_video_time_seconds, 2), 'fps': round(video_fps, 1), 'frames_totales': total_frames, 'frames_con_deteccion': len(valid_frames), 'tasa_deteccion_facial': round(face_detection_rate, 2) }, 'sonrisas': { 'total_sonrisas': total_smiles, 'duracion_promedio_sonrisas': round(avg_smile_duration, 2), 'tiempo_total_sonriendo': round(total_smile_time, 2), 'porcentaje_tiempo_sonriendo': round((total_smile_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0 }, 'parpadeos': { 'total_parpadeos': blinks, 'tasa_parpadeo_por_minuto': round(blink_rate_per_minute, 2) }, 'movimiento_cabeza': { 'estabilidad_vertical': round(std_vertical_head, 4), 'estabilidad_horizontal': round(std_horizontal_head, 4), 'movimiento_total': round(total_head_movement, 4) }, 'emociones_negativas': { 'total_enojado': total_angry, 'duracion_promedio_enojado': round(avg_angry_duration, 2), 'porcentaje_tiempo_enojado': round((total_angry_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0, 'total_grunon': total_grumpy, 'duracion_promedio_grunon': round(avg_grumpy_duration, 2), 'porcentaje_tiempo_grunon': round((total_grumpy_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0, 'total_furioso': total_furious, 'duracion_promedio_furioso': round(avg_furious_duration, 2), 'porcentaje_tiempo_furioso': round((total_furious_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0 } } @app.route('/', methods=['GET']) def index(): return render_template('index.html') @app.route('/test', methods=['GET', 'POST']) def test(): """Ruta de prueba para debugging.""" if request.method == 'GET': return jsonify({'status': 'success', 'message': 'Test endpoint working'}) else: data = request.get_json() if request.is_json else request.form.to_dict() return jsonify({'status': 'success', 'received_data': data}) @app.route('/health', methods=['GET']) def health(): """Health check endpoint.""" return jsonify({'status': 'healthy', 'message': 'Server is running'}) @app.before_request def log_request_info(): """Log información de cada request.""" logger.info(f"Request: {request.method} {request.url}") logger.info(f"Headers: {dict(request.headers)}") if request.method == 'POST': logger.info(f"Content-Type: {request.content_type}") logger.info(f"Content-Length: {request.content_length}") @app.route('/analyze', methods=['POST']) def analyze(): try: # Intentar obtener datos del form primero, luego JSON if request.content_type and 'application/json' in request.content_type: data = request.get_json() youtube_url = data.get('youtube_url', '').strip() if data else '' else: youtube_url = request.form.get('youtube_url', '').strip() logger.info(f"Received URL: {youtube_url}") logger.info(f"Content-Type: {request.content_type}") # Validaciones de entrada if not youtube_url: return jsonify({ 'status': 'error', 'message': 'Por favor, introduce una URL de YouTube válida.' }), 400 if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url: return jsonify({ 'status': 'error', 'message': 'La URL debe ser de YouTube.' }), 400 except Exception as e: logger.error(f"Error procesando request: {e}") return jsonify({ 'status': 'error', 'message': 'Error procesando la petición.' }), 400 video_path = None try: logger.info(f"Iniciando análisis de: {youtube_url}") # Descargar video usando método robusto video_path, video_info = download_youtube_video(youtube_url, app.config['UPLOAD_FOLDER']) if video_path is None: return jsonify({ 'status': 'error', 'message': video_info # video_info contiene el mensaje de error }) logger.info(f"Video descargado: {video_info['title']}") # Validar duración if video_info['duration'] > 600: # 10 minutos return jsonify({ 'status': 'error', 'message': f'El video es demasiado largo ({video_info["duration"]//60} min). Máximo 10 minutos permitidos.' }) # Verificar que el archivo se descargó correctamente if not os.path.exists(video_path): return jsonify({ 'status': 'error', 'message': 'Error al descargar el video.' }) logger.info("Video descargado. Iniciando análisis facial...") # Procesar video landmarks_df = get_video_landmarks(video_path) if landmarks_df is None or landmarks_df.empty: return jsonify({ 'status': 'error', 'message': 'No se pudieron extraer los puntos faciales. Asegúrate de que el video muestre claramente una cara.' }) logger.info("Análisis facial completado. Calculando métricas...") # Calcular métricas report_data = calculate_metrics(landmarks_df) if 'error' in report_data: return jsonify({ 'status': 'error', 'message': f'Error en el cálculo de métricas: {report_data["error"]}' }) # Agregar información del video report_data['video_title'] = video_info['title'] report_data['video_url'] = youtube_url logger.info("Análisis completado exitosamente") return jsonify({'status': 'success', 'data': report_data}) except Exception as e: logger.error(f"Error inesperado: {e}") return jsonify({ 'status': 'error', 'message': f'Error inesperado: {str(e)}' }) finally: # Limpiar archivo temporal if video_path and os.path.exists(video_path): try: os.remove(video_path) logger.info("Archivo temporal eliminado") except Exception as e: logger.error(f"Error eliminando archivo temporal: {e}") @app.errorhandler(413) def too_large(e): return jsonify({ 'status': 'error', 'message': 'El archivo es demasiado grande.' }), 413 @app.errorhandler(500) def internal_error(e): return jsonify({ 'status': 'error', 'message': 'Error interno del servidor.' }), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)