cngsm's picture
Update app.py
2908964 verified
import os
import json
import time
from datetime import datetime
from utils.video_processor import VideoProcessor
from utils.mediapipe_utils import MediaPipeProcessor
from utils.huggingface_utils import HuggingFaceUploader
class LibrasProcessingPipeline:
def __init__(self, config):
self.config = config
self.setup_directories()
def setup_directories(self):
"""Cria os diretórios necessários automaticamente"""
directories = [
self.config['input_dir'],
self.config['normalized_dir'],
self.config['keypoints_dir'],
self.config['huggingface_dir']
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
print(f"Diretório criado/verificado: {directory}")
def run_pipeline(self, video_filename):
"""Executa o pipeline completo"""
print(f"Iniciando processamento do vídeo: {video_filename}")
try:
# 1. Normalização do vídeo
print("1. Normalizando vídeo...")
normalized_path = self.normalize_video(video_filename)
# 2. Extração de keypoints com MediaPipe
print("2. Extraindo keypoints...")
keypoints_data = self.extract_keypoints(normalized_path)
# 3. Salvar JSON com keypoints
print("3. Salvando keypoints...")
json_path = self.save_keypoints(keypoints_data, video_filename)
# 4. Preparar para Hugging Face
print("4. Preparando para Hugging Face...")
hf_ready_path = self.prepare_for_huggingface(
normalized_path, json_path, video_filename
)
# 5. Upload para Hugging Face (opcional)
if self.config.get('upload_to_hf', False):
print("5. Upload para Hugging Face...")
self.upload_to_huggingface(hf_ready_path)
print("Pipeline concluído com sucesso!")
return {
'normalized_video': normalized_path,
'keypoints_json': json_path,
'hf_ready': hf_ready_path
}
except Exception as e:
print(f"Erro no pipeline: {e}")
import traceback
traceback.print_exc()
return None
def normalize_video(self, video_filename):
"""Normaliza o vídeo usando OpenCV/FFmpeg"""
processor = VideoProcessor(self.config)
input_path = os.path.join(self.config['input_dir'], video_filename)
# Verifica se o arquivo existe
if not os.path.exists(input_path):
raise FileNotFoundError(f"Vídeo não encontrado: {input_path}")
output_filename = f"normalized_{video_filename}"
output_path = os.path.join(self.config['normalized_dir'], output_filename)
return processor.normalize_video(input_path, output_path)
def extract_keypoints(self, video_path):
"""Extrai keypoints usando MediaPipe"""
processor = MediaPipeProcessor(self.config)
return processor.process_video(video_path)
def save_keypoints(self, keypoints_data, original_filename):
"""Salva os keypoints em formato JSON"""
base_name = os.path.splitext(original_filename)[0]
output_filename = f"{base_name}_keypoints.json"
output_path = os.path.join(self.config['keypoints_dir'], output_filename)
# Converter numpy arrays para listas para serialização JSON
serializable_data = []
for frame_data in keypoints_data:
serializable_frame = {}
for key, value in frame_data.items():
if hasattr(value, 'tolist'):
serializable_frame[key] = value.tolist()
else:
serializable_frame[key] = value
serializable_data.append(serializable_frame)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(serializable_data, f, indent=2, ensure_ascii=False)
print(f"Keypoints salvos em: {output_path}")
return output_path
def prepare_for_huggingface(self, video_path, json_path, original_filename):
"""Prepara os arquivos para upload no Hugging Face"""
base_name = os.path.splitext(original_filename)[0]
hf_dir = os.path.join(self.config['huggingface_dir'], base_name)
os.makedirs(hf_dir, exist_ok=True)
# Copiar vídeo normalizado
video_dest = os.path.join(hf_dir, f"{base_name}.mp4")
if os.path.exists(video_path):
import shutil
shutil.copy2(video_path, video_dest)
print(f"Vídeo copiado para: {video_dest}")
# Copiar JSON de keypoints
json_dest = os.path.join(hf_dir, f"{base_name}_keypoints.json")
if os.path.exists(json_path):
import shutil
shutil.copy2(json_path, json_dest)
print(f"JSON copiado para: {json_dest}")
# Criar metadata
metadata = {
'processing_date': datetime.now().isoformat(),
'original_video': original_filename,
'keypoints_format': 'MediaPipe Holistic',
'frame_count': len(self.load_json(json_path)) if os.path.exists(json_path) else 0,
'video_resolution': self.get_video_info(video_path) if os.path.exists(video_path) else {}
}
metadata_path = os.path.join(hf_dir, 'metadata.json')
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"Metadata criado em: {metadata_path}")
return hf_dir
def upload_to_huggingface(self, directory_path):
"""Faz upload para o Hugging Face Space"""
uploader = HuggingFaceUploader(self.config)
return uploader.upload_directory(directory_path)
def load_json(self, json_path):
"""Carrega arquivo JSON"""
if not os.path.exists(json_path):
return []
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
def get_video_info(self, video_path):
"""Obtém informações do vídeo"""
import cv2
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return {'width': width, 'height': height, 'fps': fps}
# Configuração
config = {
'input_dir': 'input',
'normalized_dir': 'output/normalized',
'keypoints_dir': 'output/keypoints',
'huggingface_dir': 'output/huggingface',
'video_normalization': {
'target_resolution': (640, 480),
'target_fps': 30,
'normalize_brightness': True,
'enhance_contrast': True
},
'mediapipe_config': {
'static_image_mode': False,
'model_complexity': 1,
'smooth_landmarks': True,
'min_detection_confidence': 0.5,
'min_tracking_confidence': 0.5
},
'huggingface': {
'repo_id': 'your-username/your-repo-name',
'token': 'your-hf-token',
'upload_to_hf': False
}
}
if __name__ == "__main__":
# Criar diretórios automaticamente
pipeline = LibrasProcessingPipeline(config)
# Verificar se existe vídeo na pasta input
input_dir = config['input_dir']
video_files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.avi', '.mov'))]
if not video_files:
print(f"Nenhum vídeo encontrado na pasta '{input_dir}'")
print("Por favor, coloque um vídeo na pasta input/")
else:
# Processar o primeiro vídeo encontrado
video_filename = video_files[0]
print(f"Processando: {video_filename}")
start_time = time.time()
result = pipeline.run_pipeline(video_filename)
end_time = time.time()
if result:
print(f"\n✅ Processamento concluído em {end_time - start_time:.2f} segundos")
print(f"📁 Resultados:")
for key, path in result.items():
print(f" {key}: {path}")
else:
print("❌ Falha no processamento")