Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import mediapipe as mp | |
| import pandas as pd | |
| import numpy as np | |
| import logging | |
| import subprocess | |
| import tempfile | |
| from flask import Flask, render_template, request, jsonify | |
| from werkzeug.utils import secure_filename | |
| import atexit | |
| import shutil | |
| # Configuración de logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Inicializa la aplicación de Flask | |
| app = Flask(__name__) | |
| # Configuración de Flask | |
| app.config['SECRET_KEY'] = 'facial-analyzer-demo-2024-super-secret-key-12345' | |
| app.config['WTF_CSRF_ENABLED'] = False # Deshabilitar CSRF para este ejemplo | |
| # Configura las rutas para guardar los archivos temporales | |
| UPLOAD_FOLDER = 'temp_files' | |
| if not os.path.exists(UPLOAD_FOLDER): | |
| os.makedirs(UPLOAD_FOLDER) | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max | |
| # Inicializa MediaPipe Face Mesh | |
| mp_face_mesh = mp.solutions.face_mesh | |
| face_mesh = mp_face_mesh.FaceMesh( | |
| static_image_mode=False, | |
| max_num_faces=1, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| def download_youtube_video(youtube_url, output_path): | |
| """ | |
| Descarga video de YouTube usando múltiples métodos (PyTube, yt-dlp). | |
| Args: | |
| youtube_url (str): URL del video de YouTube. | |
| output_path (str): Carpeta donde guardar el video. | |
| Returns: | |
| tuple: (video_path, video_info) o (None, error_message). | |
| """ | |
| logger.info(f"Intentando descargar: {youtube_url}") | |
| # Convertir URLs de Shorts a formato normal | |
| if '/shorts/' in youtube_url: | |
| video_id = youtube_url.split('/shorts/')[-1].split('?')[0] | |
| youtube_url = f"https://www.youtube.com/watch?v={video_id}" | |
| logger.info(f"Convertido de Shorts a: {youtube_url}") | |
| # Método 1: Intentar con PyTube | |
| try: | |
| # Intentar importar PyTube (múltiples versiones) | |
| try: | |
| from pytubefix import YouTube, exceptions | |
| logger.info("📦 Usando PyTubeFix") | |
| except ImportError: | |
| from pytube import YouTube, exceptions | |
| logger.info("📦 Usando PyTube estándar") | |
| logger.info("Probando método 1: PyTube") | |
| yt = YouTube( | |
| youtube_url, | |
| use_oauth=False, | |
| allow_oauth_cache=False | |
| ) | |
| # Obtener información del video | |
| video_info = { | |
| 'title': yt.title, | |
| 'duration': yt.length, | |
| 'author': yt.author | |
| } | |
| logger.info(f"Video encontrado: {video_info['title']} ({video_info['duration']}s)") | |
| # Obtener stream | |
| stream = None | |
| streams = yt.streams.filter(progressive=True, file_extension='mp4') | |
| if streams: | |
| stream = streams.order_by('resolution').desc().first() | |
| if not stream: | |
| streams = yt.streams.filter(adaptive=True, file_extension='mp4', only_video=True) | |
| if streams: | |
| stream = streams.order_by('resolution').desc().first() | |
| if not stream: | |
| raise Exception("No se encontraron streams compatibles") | |
| # Descargar | |
| video_id = yt.video_id | |
| safe_filename = secure_filename(f"{video_id}.mp4") | |
| video_path = os.path.join(output_path, safe_filename) | |
| stream.download(output_path=output_path, filename=safe_filename) | |
| if os.path.exists(video_path): | |
| logger.info("✅ Descarga exitosa con PyTube") | |
| return video_path, video_info | |
| else: | |
| raise Exception("Archivo no se creó correctamente") | |
| except Exception as e: | |
| logger.warning(f"PyTube falló: {e}") | |
| # Método 2: Intentar con yt-dlp (más robusto) | |
| try: | |
| logger.info("Probando método 2: yt-dlp") | |
| # Extraer ID del video para nombre de archivo | |
| if 'watch?v=' in youtube_url: | |
| video_id = youtube_url.split('watch?v=')[-1].split('&')[0] | |
| elif 'youtu.be/' in youtube_url: | |
| video_id = youtube_url.split('youtu.be/')[-1].split('?')[0] | |
| else: | |
| video_id = 'video' | |
| safe_filename = secure_filename(f"{video_id}.mp4") | |
| video_path = os.path.join(output_path, safe_filename) | |
| # Comando yt-dlp | |
| cmd = [ | |
| 'yt-dlp', | |
| '--format', 'mp4[height<=720]/best[height<=720]/best', | |
| '--output', video_path, | |
| '--no-playlist', | |
| youtube_url | |
| ] | |
| logger.info(f"Ejecutando: {' '.join(cmd)}") | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode == 0 and os.path.exists(video_path): | |
| # Obtener información del video | |
| info_cmd = [ | |
| 'yt-dlp', | |
| '--print', 'title', | |
| '--print', 'duration', | |
| '--print', 'uploader', | |
| '--no-download', | |
| youtube_url | |
| ] | |
| info_result = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30) | |
| if info_result.returncode == 0: | |
| lines = info_result.stdout.strip().split('\n') | |
| video_info = { | |
| 'title': lines[0] if len(lines) > 0 else 'Título desconocido', | |
| 'duration': int(float(lines[1])) if len(lines) > 1 and lines[1].replace('.', '').isdigit() else 0, | |
| 'author': lines[2] if len(lines) > 2 else 'Autor desconocido' | |
| } | |
| else: | |
| video_info = { | |
| 'title': 'Video de YouTube', | |
| 'duration': 0, | |
| 'author': 'Desconocido' | |
| } | |
| logger.info("✅ Descarga exitosa con yt-dlp") | |
| return video_path, video_info | |
| else: | |
| raise Exception(f"yt-dlp falló: {result.stderr}") | |
| except subprocess.TimeoutExpired: | |
| logger.warning("yt-dlp timeout") | |
| except FileNotFoundError: | |
| logger.warning("yt-dlp no está instalado") | |
| except Exception as e: | |
| logger.warning(f"yt-dlp falló: {e}") | |
| # Si todos los métodos fallan | |
| return None, "No se pudo descargar el video. Verifica que la URL sea correcta y el video sea público." | |
| # Puntos clave específicos que necesitamos (optimización) | |
| REQUIRED_LANDMARKS = { | |
| 'mouth': [13, 14, 61, 291], | |
| 'eyes': [386, 374, 362, 263], | |
| 'head': [4, 152, 234, 454], | |
| 'eyebrows': [107, 336, 17, 268] # Añadimos puntos para cejas | |
| } | |
| def cleanup_temp_files(): | |
| """Limpia archivos temporales al cerrar la aplicación.""" | |
| if os.path.exists(UPLOAD_FOLDER): | |
| try: | |
| shutil.rmtree(UPLOAD_FOLDER) | |
| logger.info("Archivos temporales limpiados") | |
| except Exception as e: | |
| logger.error(f"Error limpiando archivos temporales: {e}") | |
| atexit.register(cleanup_temp_files) | |
| def get_video_info(video_path): | |
| """ | |
| Obtiene información básica del video como FPS y duración. | |
| Args: | |
| video_path (str): Ruta al archivo de video. | |
| Returns: | |
| tuple: (fps, frame_count, duration) | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, None, None | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| cap.release() | |
| return fps, frame_count, duration | |
| def extract_required_landmarks(landmarks): | |
| """ | |
| Extrae solo los landmarks necesarios para optimizar memoria. | |
| Args: | |
| landmarks: Landmarks de MediaPipe. | |
| Returns: | |
| dict: Diccionario con coordenadas de puntos específicos. | |
| """ | |
| result = {} | |
| all_points = [] | |
| for category, indices in REQUIRED_LANDMARKS.items(): | |
| all_points.extend(indices) | |
| for i in all_points: | |
| if i < len(landmarks.landmark): | |
| landmark = landmarks.landmark[i] | |
| result[f'x_{i}'] = landmark.x | |
| result[f'y_{i}'] = landmark.y | |
| return result | |
| def get_video_landmarks(video_path): | |
| """ | |
| Procesa un video y extrae los puntos faciales clave (optimizado). | |
| Args: | |
| video_path (str): Ruta al archivo de video. | |
| Returns: | |
| pd.DataFrame: Un DataFrame de pandas con los puntos clave por fotograma. | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| logger.error(f"No se pudo abrir el video: {video_path}") | |
| return None | |
| # Obtener información del video | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| logger.info(f"Procesando video: {total_frames} frames a {fps} FPS") | |
| data = [] | |
| frame_count = 0 | |
| processed_frames = 0 | |
| # Procesar cada frame | |
| while cap.isOpened(): | |
| success, image = cap.read() | |
| if not success: | |
| break | |
| # Preprocesar imagen | |
| image.flags.writeable = False | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| results = face_mesh.process(image) | |
| # Extraer datos del frame | |
| row_data = {'frame': frame_count, 'fps': fps} | |
| if results.multi_face_landmarks: | |
| landmarks_data = extract_required_landmarks(results.multi_face_landmarks[0]) | |
| row_data.update(landmarks_data) | |
| processed_frames += 1 | |
| else: | |
| # Llenar con NaN si no se detecta cara | |
| for category, indices in REQUIRED_LANDMARKS.items(): | |
| for i in indices: | |
| row_data[f'x_{i}'] = np.nan | |
| row_data[f'y_{i}'] = np.nan | |
| data.append(row_data) | |
| frame_count += 1 | |
| # Log progreso cada 100 frames | |
| if frame_count % 100 == 0: | |
| logger.info(f"Procesados {frame_count}/{total_frames} frames") | |
| cap.release() | |
| if processed_frames == 0: | |
| logger.warning("No se detectaron caras en ningún frame") | |
| return None | |
| logger.info(f"Análisis completado: {processed_frames}/{frame_count} frames con detección facial") | |
| return pd.DataFrame(data) | |
| def calculate_metrics(df): | |
| """ | |
| Calcula las métricas de comportamiento a partir del DataFrame de puntos. | |
| Args: | |
| df (pd.DataFrame): DataFrame con los puntos faciales por fotograma. | |
| Returns: | |
| dict: Diccionario con todas las métricas calculadas. | |
| """ | |
| if df.empty or 'fps' not in df.columns: | |
| return {'error': 'DataFrame vacío o sin información de FPS'} | |
| total_frames = len(df) | |
| video_fps = df['fps'].iloc[0] if not df['fps'].isna().all() else 30 | |
| total_video_time_seconds = total_frames / video_fps if total_frames > 0 else 0 | |
| # Filtrar frames válidos (con detección facial) | |
| valid_frames = df.dropna() | |
| if valid_frames.empty: | |
| return {'error': 'No hay frames válidos con detección facial'} | |
| # === ANÁLISIS DE SONRISAS === | |
| try: | |
| y1, y2 = valid_frames['y_13'], valid_frames['y_14'] | |
| x1, x2 = valid_frames['x_61'], valid_frames['x_291'] | |
| # Calcular MAR (Mouth Aspect Ratio) | |
| distancia_vertical = np.abs(y1 - y2) | |
| distancia_horizontal = np.abs(x1 - x2) | |
| mar = distancia_vertical / (distancia_horizontal + 1e-6) | |
| # Detectar sonrisas | |
| smiles = [] | |
| is_smiling = False | |
| start_frame = 0 | |
| smile_threshold = 0.2 | |
| for i, (idx, row) in enumerate(valid_frames.iterrows()): | |
| current_mar = mar.iloc[i] | |
| current_frame = row['frame'] | |
| if current_mar > smile_threshold and not is_smiling: | |
| is_smiling = True | |
| start_frame = current_frame | |
| elif current_mar <= smile_threshold and is_smiling: | |
| is_smiling = False | |
| end_frame = current_frame | |
| duration = (end_frame - start_frame) / video_fps | |
| if duration > 0.1: # Filtrar sonrisas muy cortas | |
| smiles.append(duration) | |
| total_smiles = len(smiles) | |
| avg_smile_duration = np.mean(smiles) if smiles else 0 | |
| total_smile_time = sum(smiles) if smiles else 0 | |
| except Exception as e: | |
| logger.error(f"Error en análisis de sonrisas: {e}") | |
| total_smiles = 0 | |
| avg_smile_duration = 0 | |
| total_smile_time = 0 | |
| # === ANÁLISIS DE PARPADEOS === | |
| try: | |
| # Calcular EAR (Eye Aspect Ratio) | |
| dist_vert = np.abs(valid_frames['y_386'] - valid_frames['y_374']) | |
| dist_horiz = np.abs(valid_frames['x_362'] - valid_frames['x_263']) | |
| ear = dist_vert / (dist_horiz + 1e-6) | |
| # Detectar parpadeos | |
| blinks = 0 | |
| blink_threshold = 0.25 | |
| min_blink_frames = 2 | |
| blinks_consecutive_frames = 0 | |
| for value in ear: | |
| if value < blink_threshold: | |
| blinks_consecutive_frames += 1 | |
| else: | |
| if blinks_consecutive_frames >= min_blink_frames: | |
| blinks += 1 | |
| blinks_consecutive_frames = 0 | |
| blink_rate_per_minute = (blinks / total_video_time_seconds) * 60 if total_video_time_seconds > 0 else 0 | |
| except Exception as e: | |
| logger.error(f"Error en análisis de parpadeos: {e}") | |
| blinks = 0 | |
| blink_rate_per_minute = 0 | |
| # === ANÁLISIS DE MOVIMIENTO DE CABEZA === | |
| try: | |
| movimiento_vertical_cabeza = valid_frames['y_4'] - valid_frames['y_152'] | |
| movimiento_horizontal_cabeza = valid_frames['x_234'] - valid_frames['x_454'] | |
| std_vertical_head = np.std(movimiento_vertical_cabeza) | |
| std_horizontal_head = np.std(movimiento_horizontal_cabeza) | |
| # Calcular movimiento total | |
| total_head_movement = np.sqrt(std_vertical_head**2 + std_horizontal_head**2) | |
| except Exception as e: | |
| logger.error(f"Error en análisis de movimiento de cabeza: {e}") | |
| std_vertical_head = 0 | |
| std_horizontal_head = 0 | |
| total_head_movement = 0 | |
| # === ANÁLISIS DE EMOCIONES NEGATIVAS === | |
| try: | |
| # Cejas (diferencia vertical entre 107 y 336 para enojo/furioso) | |
| brow_diff = np.abs(valid_frames['y_107'] - valid_frames['y_336']) | |
| # Boca (usamos MAR ajustado para tensión) | |
| mar_tension = mar # Reusamos MAR pero con umbral diferente | |
| # Gruñón (diferencia vertical entre 17 y 268, boca neutral) | |
| mouth_neutral = np.abs(valid_frames['y_17'] - valid_frames['y_268']) | |
| # Detectar enojado (cejas fruncidas + boca tensa) | |
| angry_events = [] | |
| is_angry = False | |
| start_frame = 0 | |
| angry_brow_threshold = 0.05 # Umbral para cejas fruncidas | |
| angry_mar_threshold = 0.15 # Umbral para boca tensa | |
| for i, (idx, row) in enumerate(valid_frames.iterrows()): | |
| current_brow = brow_diff.iloc[i] | |
| current_mar = mar_tension.iloc[i] | |
| current_frame = row['frame'] | |
| if current_brow < angry_brow_threshold and current_mar < angry_mar_threshold and not is_angry: | |
| is_angry = True | |
| start_frame = current_frame | |
| elif (current_brow >= angry_brow_threshold or current_mar >= angry_mar_threshold) and is_angry: | |
| is_angry = False | |
| end_frame = current_frame | |
| duration = (end_frame - start_frame) / video_fps | |
| if duration > 0.1: | |
| angry_events.append(duration) | |
| total_angry = len(angry_events) | |
| avg_angry_duration = np.mean(angry_events) if angry_events else 0 | |
| total_angry_time = sum(angry_events) if angry_events else 0 | |
| # Detectar gruñón (cejas ligeramente fruncidas + boca neutral) | |
| grumpy_events = [] | |
| is_grumpy = False | |
| start_frame = 0 | |
| grumpy_brow_threshold = 0.08 # Umbral más suave | |
| grumpy_mouth_threshold = 0.03 # Diferencia vertical baja para neutral | |
| for i, (idx, row) in enumerate(valid_frames.iterrows()): | |
| current_brow = brow_diff.iloc[i] | |
| current_mouth = mouth_neutral.iloc[i] | |
| current_frame = row['frame'] | |
| if current_brow < grumpy_brow_threshold and current_mouth < grumpy_mouth_threshold and not is_grumpy: | |
| is_grumpy = True | |
| start_frame = current_frame | |
| elif (current_brow >= grumpy_brow_threshold or current_mouth >= grumpy_mouth_threshold) and is_grumpy: | |
| is_grumpy = False | |
| end_frame = current_frame | |
| duration = (end_frame - start_frame) / video_fps | |
| if duration > 0.1: | |
| grumpy_events.append(duration) | |
| total_grumpy = len(grumpy_events) | |
| avg_grumpy_duration = np.mean(grumpy_events) if grumpy_events else 0 | |
| total_grumpy_time = sum(grumpy_events) if grumpy_events else 0 | |
| # Detectar furioso (cejas muy fruncidas + boca tensa intensa) | |
| furious_events = [] | |
| is_furious = False | |
| start_frame = 0 | |
| furious_brow_threshold = 0.03 # Umbral más estricto | |
| furious_mar_threshold = 0.10 # Umbral más bajo para tensión fuerte | |
| for i, (idx, row) in enumerate(valid_frames.iterrows()): | |
| current_brow = brow_diff.iloc[i] | |
| current_mar = mar_tension.iloc[i] | |
| current_frame = row['frame'] | |
| if current_brow < furious_brow_threshold and current_mar < furious_mar_threshold and not is_furious: | |
| is_furious = True | |
| start_frame = current_frame | |
| elif (current_brow >= furious_brow_threshold or current_mar >= furious_mar_threshold) and is_furious: | |
| is_furious = False | |
| end_frame = current_frame | |
| duration = (end_frame - start_frame) / video_fps | |
| if duration > 0.1: | |
| furious_events.append(duration) | |
| total_furious = len(furious_events) | |
| avg_furious_duration = np.mean(furious_events) if furious_events else 0 | |
| total_furious_time = sum(furious_events) if furious_events else 0 | |
| except Exception as e: | |
| logger.error(f"Error en análisis de emociones negativas: {e}") | |
| total_angry = 0 | |
| avg_angry_duration = 0 | |
| total_angry_time = 0 | |
| total_grumpy = 0 | |
| avg_grumpy_duration = 0 | |
| total_grumpy_time = 0 | |
| total_furious = 0 | |
| avg_furious_duration = 0 | |
| total_furious_time = 0 | |
| # === MÉTRICAS ADICIONALES === | |
| face_detection_rate = len(valid_frames) / total_frames * 100 | |
| return { | |
| 'video_info': { | |
| 'duracion_total': round(total_video_time_seconds, 2), | |
| 'fps': round(video_fps, 1), | |
| 'frames_totales': total_frames, | |
| 'frames_con_deteccion': len(valid_frames), | |
| 'tasa_deteccion_facial': round(face_detection_rate, 2) | |
| }, | |
| 'sonrisas': { | |
| 'total_sonrisas': total_smiles, | |
| 'duracion_promedio_sonrisas': round(avg_smile_duration, 2), | |
| 'tiempo_total_sonriendo': round(total_smile_time, 2), | |
| 'porcentaje_tiempo_sonriendo': round((total_smile_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0 | |
| }, | |
| 'parpadeos': { | |
| 'total_parpadeos': blinks, | |
| 'tasa_parpadeo_por_minuto': round(blink_rate_per_minute, 2) | |
| }, | |
| 'movimiento_cabeza': { | |
| 'estabilidad_vertical': round(std_vertical_head, 4), | |
| 'estabilidad_horizontal': round(std_horizontal_head, 4), | |
| 'movimiento_total': round(total_head_movement, 4) | |
| }, | |
| 'emociones_negativas': { | |
| 'total_enojado': total_angry, | |
| 'duracion_promedio_enojado': round(avg_angry_duration, 2), | |
| 'porcentaje_tiempo_enojado': round((total_angry_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0, | |
| 'total_grunon': total_grumpy, | |
| 'duracion_promedio_grunon': round(avg_grumpy_duration, 2), | |
| 'porcentaje_tiempo_grunon': round((total_grumpy_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0, | |
| 'total_furioso': total_furious, | |
| 'duracion_promedio_furioso': round(avg_furious_duration, 2), | |
| 'porcentaje_tiempo_furioso': round((total_furious_time / total_video_time_seconds) * 100, 2) if total_video_time_seconds > 0 else 0 | |
| } | |
| } | |
| def index(): | |
| return render_template('index.html') | |
| def test(): | |
| """Ruta de prueba para debugging.""" | |
| if request.method == 'GET': | |
| return jsonify({'status': 'success', 'message': 'Test endpoint working'}) | |
| else: | |
| data = request.get_json() if request.is_json else request.form.to_dict() | |
| return jsonify({'status': 'success', 'received_data': data}) | |
| def health(): | |
| """Health check endpoint.""" | |
| return jsonify({'status': 'healthy', 'message': 'Server is running'}) | |
| def log_request_info(): | |
| """Log información de cada request.""" | |
| logger.info(f"Request: {request.method} {request.url}") | |
| logger.info(f"Headers: {dict(request.headers)}") | |
| if request.method == 'POST': | |
| logger.info(f"Content-Type: {request.content_type}") | |
| logger.info(f"Content-Length: {request.content_length}") | |
| def analyze(): | |
| try: | |
| # Intentar obtener datos del form primero, luego JSON | |
| if request.content_type and 'application/json' in request.content_type: | |
| data = request.get_json() | |
| youtube_url = data.get('youtube_url', '').strip() if data else '' | |
| else: | |
| youtube_url = request.form.get('youtube_url', '').strip() | |
| logger.info(f"Received URL: {youtube_url}") | |
| logger.info(f"Content-Type: {request.content_type}") | |
| # Validaciones de entrada | |
| if not youtube_url: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Por favor, introduce una URL de YouTube válida.' | |
| }), 400 | |
| if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'La URL debe ser de YouTube.' | |
| }), 400 | |
| except Exception as e: | |
| logger.error(f"Error procesando request: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Error procesando la petición.' | |
| }), 400 | |
| video_path = None | |
| try: | |
| logger.info(f"Iniciando análisis de: {youtube_url}") | |
| # Descargar video usando método robusto | |
| video_path, video_info = download_youtube_video(youtube_url, app.config['UPLOAD_FOLDER']) | |
| if video_path is None: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': video_info # video_info contiene el mensaje de error | |
| }) | |
| logger.info(f"Video descargado: {video_info['title']}") | |
| # Validar duración | |
| if video_info['duration'] > 600: # 10 minutos | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'El video es demasiado largo ({video_info["duration"]//60} min). Máximo 10 minutos permitidos.' | |
| }) | |
| # Verificar que el archivo se descargó correctamente | |
| if not os.path.exists(video_path): | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Error al descargar el video.' | |
| }) | |
| logger.info("Video descargado. Iniciando análisis facial...") | |
| # Procesar video | |
| landmarks_df = get_video_landmarks(video_path) | |
| if landmarks_df is None or landmarks_df.empty: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No se pudieron extraer los puntos faciales. Asegúrate de que el video muestre claramente una cara.' | |
| }) | |
| logger.info("Análisis facial completado. Calculando métricas...") | |
| # Calcular métricas | |
| report_data = calculate_metrics(landmarks_df) | |
| if 'error' in report_data: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'Error en el cálculo de métricas: {report_data["error"]}' | |
| }) | |
| # Agregar información del video | |
| report_data['video_title'] = video_info['title'] | |
| report_data['video_url'] = youtube_url | |
| logger.info("Análisis completado exitosamente") | |
| return jsonify({'status': 'success', 'data': report_data}) | |
| except Exception as e: | |
| logger.error(f"Error inesperado: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'Error inesperado: {str(e)}' | |
| }) | |
| finally: | |
| # Limpiar archivo temporal | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| logger.info("Archivo temporal eliminado") | |
| except Exception as e: | |
| logger.error(f"Error eliminando archivo temporal: {e}") | |
| def too_large(e): | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'El archivo es demasiado grande.' | |
| }), 413 | |
| def internal_error(e): | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Error interno del servidor.' | |
| }), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=5000) |