Fotogramas / app.py
leonett's picture
Update app.py
b856b54 verified
import gradio as gr
import cv2
import os
import zipfile
from PIL import Image, ImageOps
from datetime import datetime
import hashlib
import shutil
from concurrent.futures import ThreadPoolExecutor
TEMP_CACHE = None
def guardar_frame(frame, index, temp_dir):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame_rgb)
img_path = os.path.join(temp_dir, f"frame_{index:04d}.jpg")
img.save(img_path)
return img_path
def procesar_video(video_path):
try:
original_name = os.path.basename(video_path)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
temp_dir = f"temp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
os.makedirs(temp_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise gr.Error("No se pudo abrir el archivo de video. Formato no soportado o archivo dañado.")
index = 0
futures = []
with ThreadPoolExecutor(max_workers=4) as executor:
while True:
ret, frame = cap.read()
if not ret:
break
futures.append(executor.submit(guardar_frame, frame, index, temp_dir))
index += 1
cap.release()
frame_paths = [f.result() for f in futures]
frame_count = len(frame_paths)
if frame_count == 0:
raise gr.Error("No se pudieron extraer fotogramas del video.")
n_seleccion = 4
step = max(1, frame_count // (n_seleccion + 1))
selected_indices = [step * (i+1) for i in range(n_seleccion)]
selected_frames = [frame_paths[min(i, len(frame_paths)-1)] for i in selected_indices]
images = []
for img_path in selected_frames:
img = Image.open(img_path)
bordered_img = ImageOps.expand(img, border=2, fill='white')
images.append(bordered_img)
img_w, img_h = images[0].size
margin = 30
border_size = 20
shadow_offset = 5
collage_width = (img_w * 2) + margin + (border_size * 2)
collage_height = (img_h * 2) + margin + (border_size * 2)
collage = Image.new('RGB', (collage_width, collage_height), (230, 230, 230))
positions = [
(border_size, border_size),
(border_size + img_w + margin, border_size),
(border_size, border_size + img_h + margin),
(border_size + img_w + margin, border_size + img_h + margin)
]
for i, img in enumerate(images):
shadow = Image.new('RGBA', (img_w + shadow_offset, img_h + shadow_offset), (0,0,0,50))
collage.paste(shadow, (positions[i][0]+shadow_offset, positions[i][1]+shadow_offset), shadow)
collage.paste(img, positions[i])
collage_path = os.path.join(temp_dir, "collage_forense.jpg")
collage.save(collage_path, quality=95, dpi=(300, 300))
base_name = os.path.splitext(original_name)[0]
zip_filename = f"{base_name}_Fotogramas.zip"
final_zip_path = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(final_zip_path, mode="w") as zipf:
for img_path in frame_paths:
zipf.write(img_path, os.path.basename(img_path))
with open(video_path, "rb") as f:
video_hash = hashlib.md5(f.read()).hexdigest()
chain_content = (
"=== CADENA DE CUSTODIA DIGITAL ===\\r\\n\\r\\n"
f"• Archivo original: {original_name}\\r\\n"
f"• Fecha de procesamiento: {timestamp}\\r\\n"
f"• Fotogramas totales: {frame_count}\\r\\n"
f"• Hash MD5 video: {video_hash}\\r\\n"
f"• Fotogramas muestra: {', '.join([f'#{i+1}' for i in selected_indices])}\\r\\n\\r\\n"
"Este documento certifica la integridad del proceso de extracción.\\n"
"Sistema Certificado por Peritos Forenses Digitales de Guatemala. \\n"
"www.forensedigital.gt"
)
zipf.writestr("00_CADENA_CUSTODIA.txt", chain_content)
global TEMP_CACHE
TEMP_CACHE = temp_dir
return collage_path, final_zip_path
except Exception as e:
raise gr.Error(f"Error en procesamiento: {str(e)}")
def limpiar_cache():
global TEMP_CACHE
if TEMP_CACHE and os.path.exists(TEMP_CACHE):
shutil.rmtree(TEMP_CACHE)
TEMP_CACHE = None
with gr.Blocks(title="Extractor Forense de Fotogramas") as demo:
gr.Markdown("# 📷 Extractor Forense de Fotogramas de Videos")
gr.Markdown("**Herramienta para la Extracción Forense de Fotogramas de Videos** (No se guarda ninguna información).")
gr.Markdown("Desarrollado por José R. Leonett para el Grupo de Peritos Forenses Digitales de Guatemala - [www.forensedigital.gt](https://www.forensedigital.gt)")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(
label="🎞️ VIDEO CARGADO",
format="mp4",
interactive=True,
height=480,
sources=["upload"]
)
procesar_btn = gr.Button("🔍 INICIAR ANÁLISIS", interactive=False)
with gr.Column(scale=1):
gallery_output = gr.Image(label="📸 COLLAGE DE REFERENCIA", height=400)
download_file = gr.File(label="📂 DESCARGAR EVIDENCIAS", visible=True)
def habilitar_procesado(video):
limpiar_cache()
return gr.update(interactive=True), None, None
video_input.change(
fn=habilitar_procesado,
inputs=video_input,
outputs=[procesar_btn, gallery_output, download_file],
queue=False
)
def procesar_y_mostrar(video):
collage, zip_path = procesar_video(video)
return collage, zip_path
procesar_btn.click(
fn=procesar_y_mostrar,
inputs=video_input,
outputs=[gallery_output, download_file]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)