import os import json import time from datetime import datetime from utils.video_processor import VideoProcessor from utils.mediapipe_utils import MediaPipeProcessor from utils.huggingface_utils import HuggingFaceUploader class LibrasProcessingPipeline: def __init__(self, config): self.config = config self.setup_directories() def setup_directories(self): """Cria os diretórios necessários automaticamente""" directories = [ self.config['input_dir'], self.config['normalized_dir'], self.config['keypoints_dir'], self.config['huggingface_dir'] ] for directory in directories: os.makedirs(directory, exist_ok=True) print(f"Diretório criado/verificado: {directory}") def run_pipeline(self, video_filename): """Executa o pipeline completo""" print(f"Iniciando processamento do vídeo: {video_filename}") try: # 1. Normalização do vídeo print("1. Normalizando vídeo...") normalized_path = self.normalize_video(video_filename) # 2. Extração de keypoints com MediaPipe print("2. Extraindo keypoints...") keypoints_data = self.extract_keypoints(normalized_path) # 3. Salvar JSON com keypoints print("3. Salvando keypoints...") json_path = self.save_keypoints(keypoints_data, video_filename) # 4. Preparar para Hugging Face print("4. Preparando para Hugging Face...") hf_ready_path = self.prepare_for_huggingface( normalized_path, json_path, video_filename ) # 5. Upload para Hugging Face (opcional) if self.config.get('upload_to_hf', False): print("5. Upload para Hugging Face...") self.upload_to_huggingface(hf_ready_path) print("Pipeline concluído com sucesso!") return { 'normalized_video': normalized_path, 'keypoints_json': json_path, 'hf_ready': hf_ready_path } except Exception as e: print(f"Erro no pipeline: {e}") import traceback traceback.print_exc() return None def normalize_video(self, video_filename): """Normaliza o vídeo usando OpenCV/FFmpeg""" processor = VideoProcessor(self.config) input_path = os.path.join(self.config['input_dir'], video_filename) # Verifica se o arquivo existe if not os.path.exists(input_path): raise FileNotFoundError(f"Vídeo não encontrado: {input_path}") output_filename = f"normalized_{video_filename}" output_path = os.path.join(self.config['normalized_dir'], output_filename) return processor.normalize_video(input_path, output_path) def extract_keypoints(self, video_path): """Extrai keypoints usando MediaPipe""" processor = MediaPipeProcessor(self.config) return processor.process_video(video_path) def save_keypoints(self, keypoints_data, original_filename): """Salva os keypoints em formato JSON""" base_name = os.path.splitext(original_filename)[0] output_filename = f"{base_name}_keypoints.json" output_path = os.path.join(self.config['keypoints_dir'], output_filename) # Converter numpy arrays para listas para serialização JSON serializable_data = [] for frame_data in keypoints_data: serializable_frame = {} for key, value in frame_data.items(): if hasattr(value, 'tolist'): serializable_frame[key] = value.tolist() else: serializable_frame[key] = value serializable_data.append(serializable_frame) with open(output_path, 'w', encoding='utf-8') as f: json.dump(serializable_data, f, indent=2, ensure_ascii=False) print(f"Keypoints salvos em: {output_path}") return output_path def prepare_for_huggingface(self, video_path, json_path, original_filename): """Prepara os arquivos para upload no Hugging Face""" base_name = os.path.splitext(original_filename)[0] hf_dir = os.path.join(self.config['huggingface_dir'], base_name) os.makedirs(hf_dir, exist_ok=True) # Copiar vídeo normalizado video_dest = os.path.join(hf_dir, f"{base_name}.mp4") if os.path.exists(video_path): import shutil shutil.copy2(video_path, video_dest) print(f"Vídeo copiado para: {video_dest}") # Copiar JSON de keypoints json_dest = os.path.join(hf_dir, f"{base_name}_keypoints.json") if os.path.exists(json_path): import shutil shutil.copy2(json_path, json_dest) print(f"JSON copiado para: {json_dest}") # Criar metadata metadata = { 'processing_date': datetime.now().isoformat(), 'original_video': original_filename, 'keypoints_format': 'MediaPipe Holistic', 'frame_count': len(self.load_json(json_path)) if os.path.exists(json_path) else 0, 'video_resolution': self.get_video_info(video_path) if os.path.exists(video_path) else {} } metadata_path = os.path.join(hf_dir, 'metadata.json') with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) print(f"Metadata criado em: {metadata_path}") return hf_dir def upload_to_huggingface(self, directory_path): """Faz upload para o Hugging Face Space""" uploader = HuggingFaceUploader(self.config) return uploader.upload_directory(directory_path) def load_json(self, json_path): """Carrega arquivo JSON""" if not os.path.exists(json_path): return [] with open(json_path, 'r', encoding='utf-8') as f: return json.load(f) def get_video_info(self, video_path): """Obtém informações do vídeo""" import cv2 cap = cv2.VideoCapture(video_path) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return {'width': width, 'height': height, 'fps': fps} # Configuração config = { 'input_dir': 'input', 'normalized_dir': 'output/normalized', 'keypoints_dir': 'output/keypoints', 'huggingface_dir': 'output/huggingface', 'video_normalization': { 'target_resolution': (640, 480), 'target_fps': 30, 'normalize_brightness': True, 'enhance_contrast': True }, 'mediapipe_config': { 'static_image_mode': False, 'model_complexity': 1, 'smooth_landmarks': True, 'min_detection_confidence': 0.5, 'min_tracking_confidence': 0.5 }, 'huggingface': { 'repo_id': 'your-username/your-repo-name', 'token': 'your-hf-token', 'upload_to_hf': False } } if __name__ == "__main__": # Criar diretórios automaticamente pipeline = LibrasProcessingPipeline(config) # Verificar se existe vídeo na pasta input input_dir = config['input_dir'] video_files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.avi', '.mov'))] if not video_files: print(f"Nenhum vídeo encontrado na pasta '{input_dir}'") print("Por favor, coloque um vídeo na pasta input/") else: # Processar o primeiro vídeo encontrado video_filename = video_files[0] print(f"Processando: {video_filename}") start_time = time.time() result = pipeline.run_pipeline(video_filename) end_time = time.time() if result: print(f"\n✅ Processamento concluído em {end_time - start_time:.2f} segundos") print(f"📁 Resultados:") for key, path in result.items(): print(f" {key}: {path}") else: print("❌ Falha no processamento")