facial_analyzer / app.py
Denisijcu's picture
upload files
b83c09e verified
import os
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np
import logging
import subprocess
import tempfile
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
import atexit
import shutil
# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Inicializa la aplicación de Flask
app = Flask(__name__)
# Configuración de Flask
app.config['SECRET_KEY'] = 'facial-analyzer-demo-2024-super-secret-key-12345'
app.config['WTF_CSRF_ENABLED'] = False # Deshabilitar CSRF para este ejemplo
# Configura las rutas para guardar los archivos temporales
UPLOAD_FOLDER = 'temp_files'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max
# Inicializa MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
def download_youtube_video(youtube_url, output_path):
"""
Descarga video de YouTube usando múltiples métodos (PyTube, yt-dlp).
Args:
youtube_url (str): URL del video de YouTube.
output_path (str): Carpeta donde guardar el video.
Returns:
tuple: (video_path, video_info) o (None, error_message).
"""
logger.info(f"Intentando descargar: {youtube_url}")
# Convertir URLs de Shorts a formato normal
if '/shorts/' in youtube_url:
video_id = youtube_url.split('/shorts/')[-1].split('?')[0]
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
logger.info(f"Convertido de Shorts a: {youtube_url}")
# Método 1: Intentar con PyTube
try:
# Intentar importar PyTube (múltiples versiones)
try:
from pytubefix import YouTube, exceptions
logger.info("📦 Usando PyTubeFix")
except ImportError:
from pytube import YouTube, exceptions
logger.info("📦 Usando PyTube estándar")
logger.info("Probando método 1: PyTube")
yt = YouTube(
youtube_url,
use_oauth=False,
allow_oauth_cache=False
)
# Obtener información del video
video_info = {
'title': yt.title,
'duration': yt.length,
'author': yt.author
}
logger.info(f"Video encontrado: {video_info['title']} ({video_info['duration']}s)")
# Obtener stream
stream = None
streams = yt.streams.filter(progressive=True, file_extension='mp4')
if streams:
stream = streams.order_by('resolution').desc().first()
if not stream:
streams = yt.streams.filter(adaptive=True, file_extension='mp4', only_video=True)
if streams:
stream = streams.order_by('resolution').desc().first()
if not stream:
raise Exception("No se encontraron streams compatibles")
# Descargar
video_id = yt.video_id
safe_filename = secure_filename(f"{video_id}.mp4")
video_path = os.path.join(output_path, safe_filename)
stream.download(output_path=output_path, filename=safe_filename)
if os.path.exists(video_path):
logger.info("✅ Descarga exitosa con PyTube")
return video_path, video_info
else:
raise Exception("Archivo no se creó correctamente")
except Exception as e:
logger.warning(f"PyTube falló: {e}")
# Método 2: Intentar con yt-dlp (más robusto)
try:
logger.info("Probando método 2: yt-dlp")
# Extraer ID del video para nombre de archivo
if 'watch?v=' in youtube_url:
video_id = youtube_url.split('watch?v=')[-1].split('&')[0]
elif 'youtu.be/' in youtube_url:
video_id = youtube_url.split('youtu.be/')[-1].split('?')[0]
else:
video_id = 'video'
safe_filename = secure_filename(f"{video_id}.mp4")
video_path = os.path.join(output_path, safe_filename)
# Comando yt-dlp
cmd = [
'yt-dlp',
'--format', 'mp4[height<=720]/best[height<=720]/best',
'--output', video_path,
'--no-playlist',
youtube_url
]
logger.info(f"Ejecutando: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and os.path.exists(video_path):
# Obtener información del video
info_cmd = [
'yt-dlp',
'--print', 'title',
'--print', 'duration',
'--print', 'uploader',
'--no-download',
youtube_url
]
info_result = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30)
if info_result.returncode == 0:
lines = info_result.stdout.strip().split('\n')
video_info = {
'title': lines[0] if len(lines) > 0 else 'Título desconocido',
'duration': int(float(lines[1])) if len(lines) > 1 and lines[1].replace('.', '').isdigit() else 0,
'author': lines[2] if len(lines) > 2 else 'Autor desconocido'
}
else:
video_info = {
'title': 'Video de YouTube',
'duration': 0,
'author': 'Desconocido'
}
logger.info("✅ Descarga exitosa con yt-dlp")
return video_path, video_info
else:
raise Exception(f"yt-dlp falló: {result.stderr}")
except subprocess.TimeoutExpired:
logger.warning("yt-dlp timeout")
except FileNotFoundError:
logger.warning("yt-dlp no está instalado")
except Exception as e:
logger.warning(f"yt-dlp falló: {e}")
# Si todos los métodos fallan
return None, "No se pudo descargar el video. Verifica que la URL sea correcta y el video sea público."
# Puntos clave específicos que necesitamos (optimización)
REQUIRED_LANDMARKS = {
'mouth': [13, 14, 61, 291],
'eyes': [386, 374, 362, 263],
'head': [4, 152, 234, 454],
'eyebrows': [107, 336, 17, 268] # Añadimos puntos para cejas
}
def cleanup_temp_files():
"""Limpia archivos temporales al cerrar la aplicación."""
if os.path.exists(UPLOAD_FOLDER):
try:
shutil.rmtree(UPLOAD_FOLDER)
logger.info("Archivos temporales limpiados")
except Exception as e:
logger.error(f"Error limpiando archivos temporales: {e}")
atexit.register(cleanup_temp_files)
def get_video_info(video_path):
"""
Obtiene información básica del video como FPS y duración.
Args:
video_path (str): Ruta al archivo de video.
Returns:
tuple: (fps, frame_count, duration)
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, None, None
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return fps, frame_count, duration
def extract_required_landmarks(landmarks):
"""
Extrae solo los landmarks necesarios para optimizar memoria.
Args:
landmarks: Landmarks de MediaPipe.
Returns:
dict: Diccionario con coordenadas de puntos específicos.
"""
result = {}
all_points = []
for category, indices in REQUIRED_LANDMARKS.items():
all_points.extend(indices)
for i in all_points:
if i < len(landmarks.landmark):
landmark = landmarks.landmark[i]
result[f'x_{i}'] = landmark.x
result[f'y_{i}'] = landmark.y
return result
def get_video_landmarks(video_path):
"""
Procesa un video y extrae los puntos faciales clave (optimizado).
Args:
video_path (str): Ruta al archivo de video.
Returns:
pd.DataFrame: Un DataFrame de pandas con los puntos clave por fotograma.
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"No se pudo abrir el video: {video_path}")
return None
# Obtener información del video
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
logger.info(f"Procesando video: {total_frames} frames a {fps} FPS")
data = []
frame_count = 0
processed_frames = 0
# Procesar cada frame
while cap.isOpened():
success, image = cap.read()
if not success:
break
# Preprocesar imagen
image.flags.writeable = False
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(image)
# Extraer datos del frame
row_data = {'frame': frame_count, 'fps': fps}
if results.multi_face_landmarks:
landmarks_data = extract_required_landmarks(results.multi_face_landmarks[0])
row_data.update(landmarks_data)
processed_frames += 1
else:
# Llenar con NaN si no se detecta cara
for category, indices in REQUIRED_LANDMARKS.items():
for i in indices:
row_data[f'x_{i}'] = np.nan
row_data[f'y_{i}'] = np.nan
data.append(row_data)
frame_count += 1
# Log progreso cada 100 frames
if frame_count % 100 == 0:
logger.info(f"Procesados {frame_count}/{total_frames} frames")
cap.release()
if processed_frames == 0:
logger.warning("No se detectaron caras en ningún frame")
return None
logger.info(f"Análisis completado: {processed_frames}/{frame_count} frames con detección facial")
return pd.DataFrame(data)
def calculate_metrics(df):
"""
Calcula las métricas de comportamiento a partir del DataFrame de puntos.
Args:
df (pd.DataFrame): DataFrame con los puntos faciales por fotograma.
Returns:
dict: Diccionario con todas las métricas calculadas.
"""
if df.empty or 'fps' not in df.columns:
return {'error': 'DataFrame vacío o sin información de FPS'}
total_frames = len(df)
video_fps = df['fps'].iloc[0] if not df['fps'].isna().all() else 30
total_video_time_seconds = total_frames / video_fps if total_frames > 0 else 0
# Filtrar frames válidos (con detección facial)
valid_frames = df.dropna()
if valid_frames.empty:
return {'error': 'No hay frames válidos con detección facial'}
# === ANÁLISIS DE SONRISAS ===
try:
y1, y2 = valid_frames['y_13'], valid_frames['y_14']
x1, x2 = valid_frames['x_61'], valid_frames['x_291']
# Calcular MAR (Mouth Aspect Ratio)
distancia_vertical = np.abs(y1 - y2)
distancia_horizontal = np.abs(x1 - x2)
mar = distancia_vertical / (distancia_horizontal + 1e-6)
# Detectar sonrisas
smiles = []
is_smiling = False
start_frame = 0
smile_threshold = 0.2
for i, (idx, row) in enumerate(valid_frames.iterrows()):
current_mar = mar.iloc[i]
current_frame = row['frame']
if current_mar > smile_threshold and not is_smiling:
is_smiling = True
start_frame = current_frame
elif current_mar <= smile_threshold and is_smiling:
is_smiling = False
end_frame = current_frame
duration = (end_frame - start_frame) / video_fps
if duration > 0.1: # Filtrar sonrisas muy cortas
smiles.append(duration)
total_smiles = len(smiles)
avg_smile_duration = np.mean(smiles) if smiles else 0
total_smile_time = sum(smiles) if smiles else 0
except Exception as e:
logger.error(f"Error en análisis de sonrisas: {e}")
total_smiles = 0
avg_smile_duration = 0
total_smile_time = 0
# === ANÁLISIS DE PARPADEOS ===
try:
# Calcular EAR (Eye Aspect Ratio)
dist_vert = np.abs(valid_frames['y_386'] - valid_frames['y_374'])
dist_horiz = np.abs(valid_frames['x_362'] - valid_frames['x_263'])
ear = dist_vert / (dist_horiz + 1e-6)
# Detectar parpadeos
blinks = 0
blink_threshold = 0.25
min_blink_frames = 2
blinks_consecutive_frames = 0
for value in ear:
if value < blink_threshold:
blinks_consecutive_frames += 1
else:
if blinks_consecutive_frames >= min_blink_frames:
blinks += 1
blinks_consecutive_frames = 0
blink_rate_per_minute = (blinks / total_video_time_seconds) * 60 if total_video_time_seconds > 0 else 0
except Exception as e:
logger.error(f"Error en análisis de parpadeos: {e}")
blinks = 0
blink_rate_per_minute = 0
# === ANÁLISIS DE MOVIMIENTO DE CABEZA ===
try:
movimiento_vertical_cabeza = valid_frames['y_4'] - valid_frames['y_152']
movimiento_horizontal_cabeza = valid_frames['x_234'] - valid_frames['x_454']
std_vertical_head = np.std(movimiento_vertical_cabeza)
std_horizontal_head = np.std(movimiento_horizontal_cabeza)
# Calcular movimiento total
total_head_movement = np.sqrt(std_vertical_head**2 + std_horizontal_head**2)
except Exception as e:
logger.error(f"Error en análisis de movimiento de cabeza: {e}")
std_vertical_head = 0
std_horizontal_head = 0
total_head_movement = 0
# === ANÁLISIS DE EMOCIONES NEGATIVAS ===
try:
# Cejas (diferencia vertical entre 107 y 336 para enojo/furioso)
brow_diff = np.abs(valid_frames['y_107'] - valid_frames['y_336'])
# Boca (usamos MAR ajustado para tensión)
mar_tension = mar # Reusamos MAR pero con umbral diferente
# Gruñón (diferencia vertical entre 17 y 268, boca neutral)
mouth_neutral = np.abs(valid_frames['y_17'] - valid_frames['y_268'])
# Detectar enojado (cejas fruncidas + boca tensa)
angry_events = []
is_angry = False
start_frame = 0
angry_brow_threshold = 0.05 # Umbral para cejas fruncidas
angry_mar_threshold = 0.15 # Umbral para boca tensa
for i, (idx, row) in enumerate(valid_frames.iterrows()):
current_brow = brow_diff.iloc[i]
current_mar = mar_tension.iloc[i]
current_frame = row['frame']
if current_brow < angry_brow_threshold and current_mar < angry_mar_threshold and not is_angry:
is_angry = True
start_frame = current_frame
elif (current_brow >= angry_brow_threshold or current_mar >= angry_mar_threshold) and is_angry:
is_angry = False
end_frame = current_frame
duration = (end_frame - start_frame) / video_fps
if duration > 0.1:
angry_events.append(duration)
total_angry = len(angry_events)
avg_angry_duration = np.mean(angry_events) if angry_events else 0
total_angry_time = sum(angry_events) if angry_events else 0
# Detectar gruñón (cejas ligeramente fruncidas + boca neutral)
grumpy_events = []
is_grumpy = False
start_frame = 0
grumpy_brow_threshold = 0.08 # Umbral más suave
grumpy_mouth_threshold = 0.03 # Diferencia vertical baja para neutral
for i, (idx, row) in enumerate(valid_frames.iterrows()):
current_brow = brow_diff.iloc[i]
current_mouth = mouth_neutral.iloc[i]
current_frame = row['frame']
if current_brow < grumpy_brow_threshold and current_mouth < grumpy_mouth_threshold and not is_grumpy:
is_grumpy = True
start_frame = current_frame
elif (current_brow >= grumpy_brow_threshold or current_mouth >= grumpy_mouth_threshold) and is_grumpy:
is_grumpy = False
end_frame = current_frame
duration = (end_frame - start_frame) / video_fps
if duration > 0.1:
grumpy_events.append(duration)
total_grumpy = len(grumpy_events)
avg_grumpy_duration = np.mean(grumpy_events) if grumpy_events else 0
total_grumpy_time = sum(grumpy_events) if grumpy_events else 0
# Detectar furioso (cejas muy fruncidas + boca tensa intensa)
furious_events = []
is_furious = False
start_frame = 0
furious_brow_threshold = 0.03 # Umbral más estricto
furious_mar_threshold = 0.10 # Umbral más bajo para tensión fuerte
for i, (idx, row) in enumerate(valid_frames.iterrows()):
current_brow = brow_diff.iloc[i]
current_mar = mar_tension.iloc[i]
current_frame = row['frame']
if current_brow < furious_brow_threshold and current_mar < furious_mar_threshold and not is_furious:
is_furious = True
start_frame = current_frame
elif (current_brow >= furious_brow_threshold or current_mar >= furious_mar_threshold) and is_furious:
is_furious = False
end_frame = current_frame
duration = (end_frame - start_frame) / video_fps
if duration > 0.1:
furious_events.append(duration)
total_furious = len(furious_events)
avg_furious_duration = np.mean(furious_events) if furious_events else 0
total_furious_time = sum(furious_events) if furious_events else 0
except Exception as e:
logger.error(f"Error en análisis de emociones negativas: {e}")
total_angry = 0
avg_angry_duration = 0
total_angry_time = 0
total_grumpy = 0
avg_grumpy_duration = 0
total_grumpy_time = 0
total_furious = 0
avg_furious_duration = 0
total_furious_time = 0
# === MÉTRICAS ADICIONALES ===
face_detection_rate = len(valid_frames) / total_frames * 100
return {
'video_info': {
'duracion_total': round(total_video_time_seconds, 2),
'fps': round(video_fps, 1),
'frames_totales': total_frames,
'frames_con_deteccion': len(valid_frames),
'tasa_deteccion_facial': round(face_detection_rate, 2)
},
'sonrisas': {
'total_sonrisas': total_smiles,
'duracion_promedio_sonrisas': round(avg_smile_duration, 2),
'tiempo_total_sonriendo': round(total_smile_time, 2),
'porcentaje_tiempo_sonriendo': round((total_smile_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0
},
'parpadeos': {
'total_parpadeos': blinks,
'tasa_parpadeo_por_minuto': round(blink_rate_per_minute, 2)
},
'movimiento_cabeza': {
'estabilidad_vertical': round(std_vertical_head, 4),
'estabilidad_horizontal': round(std_horizontal_head, 4),
'movimiento_total': round(total_head_movement, 4)
},
'emociones_negativas': {
'total_enojado': total_angry,
'duracion_promedio_enojado': round(avg_angry_duration, 2),
'porcentaje_tiempo_enojado': round((total_angry_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0,
'total_grunon': total_grumpy,
'duracion_promedio_grunon': round(avg_grumpy_duration, 2),
'porcentaje_tiempo_grunon': round((total_grumpy_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0,
'total_furioso': total_furious,
'duracion_promedio_furioso': round(avg_furious_duration, 2),
'porcentaje_tiempo_furioso': round((total_furious_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0
}
}
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
@app.route('/test', methods=['GET', 'POST'])
def test():
"""Ruta de prueba para debugging."""
if request.method == 'GET':
return jsonify({'status': 'success', 'message': 'Test endpoint working'})
else:
data = request.get_json() if request.is_json else request.form.to_dict()
return jsonify({'status': 'success', 'received_data': data})
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint."""
return jsonify({'status': 'healthy', 'message': 'Server is running'})
@app.before_request
def log_request_info():
"""Log información de cada request."""
logger.info(f"Request: {request.method} {request.url}")
logger.info(f"Headers: {dict(request.headers)}")
if request.method == 'POST':
logger.info(f"Content-Type: {request.content_type}")
logger.info(f"Content-Length: {request.content_length}")
@app.route('/analyze', methods=['POST'])
def analyze():
try:
# Intentar obtener datos del form primero, luego JSON
if request.content_type and 'application/json' in request.content_type:
data = request.get_json()
youtube_url = data.get('youtube_url', '').strip() if data else ''
else:
youtube_url = request.form.get('youtube_url', '').strip()
logger.info(f"Received URL: {youtube_url}")
logger.info(f"Content-Type: {request.content_type}")
# Validaciones de entrada
if not youtube_url:
return jsonify({
'status': 'error',
'message': 'Por favor, introduce una URL de YouTube válida.'
}), 400
if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url:
return jsonify({
'status': 'error',
'message': 'La URL debe ser de YouTube.'
}), 400
except Exception as e:
logger.error(f"Error procesando request: {e}")
return jsonify({
'status': 'error',
'message': 'Error procesando la petición.'
}), 400
video_path = None
try:
logger.info(f"Iniciando análisis de: {youtube_url}")
# Descargar video usando método robusto
video_path, video_info = download_youtube_video(youtube_url, app.config['UPLOAD_FOLDER'])
if video_path is None:
return jsonify({
'status': 'error',
'message': video_info # video_info contiene el mensaje de error
})
logger.info(f"Video descargado: {video_info['title']}")
# Validar duración
if video_info['duration'] > 600: # 10 minutos
return jsonify({
'status': 'error',
'message': f'El video es demasiado largo ({video_info["duration"]//60} min). Máximo 10 minutos permitidos.'
})
# Verificar que el archivo se descargó correctamente
if not os.path.exists(video_path):
return jsonify({
'status': 'error',
'message': 'Error al descargar el video.'
})
logger.info("Video descargado. Iniciando análisis facial...")
# Procesar video
landmarks_df = get_video_landmarks(video_path)
if landmarks_df is None or landmarks_df.empty:
return jsonify({
'status': 'error',
'message': 'No se pudieron extraer los puntos faciales. Asegúrate de que el video muestre claramente una cara.'
})
logger.info("Análisis facial completado. Calculando métricas...")
# Calcular métricas
report_data = calculate_metrics(landmarks_df)
if 'error' in report_data:
return jsonify({
'status': 'error',
'message': f'Error en el cálculo de métricas: {report_data["error"]}'
})
# Agregar información del video
report_data['video_title'] = video_info['title']
report_data['video_url'] = youtube_url
logger.info("Análisis completado exitosamente")
return jsonify({'status': 'success', 'data': report_data})
except Exception as e:
logger.error(f"Error inesperado: {e}")
return jsonify({
'status': 'error',
'message': f'Error inesperado: {str(e)}'
})
finally:
# Limpiar archivo temporal
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
logger.info("Archivo temporal eliminado")
except Exception as e:
logger.error(f"Error eliminando archivo temporal: {e}")
@app.errorhandler(413)
def too_large(e):
return jsonify({
'status': 'error',
'message': 'El archivo es demasiado grande.'
}), 413
@app.errorhandler(500)
def internal_error(e):
return jsonify({
'status': 'error',
'message': 'Error interno del servidor.'
}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)