Spaces:
Sleeping
Sleeping
File size: 6,098 Bytes
e5be641 144e9fc e5be641 144e9fc e5be641 553627f 144e9fc e5be641 144e9fc e5be641 553627f e5be641 553627f e5be641 fe55513 e5be641 b856b54 144e9fc 33095ba e5be641 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import gradio as gr
import cv2
import os
import zipfile
from PIL import Image, ImageOps
from datetime import datetime
import hashlib
import shutil
from concurrent.futures import ThreadPoolExecutor
TEMP_CACHE = None
def guardar_frame(frame, index, temp_dir):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame_rgb)
img_path = os.path.join(temp_dir, f"frame_{index:04d}.jpg")
img.save(img_path)
return img_path
def procesar_video(video_path):
try:
original_name = os.path.basename(video_path)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
temp_dir = f"temp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
os.makedirs(temp_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise gr.Error("No se pudo abrir el archivo de video. Formato no soportado o archivo dañado.")
index = 0
futures = []
with ThreadPoolExecutor(max_workers=4) as executor:
while True:
ret, frame = cap.read()
if not ret:
break
futures.append(executor.submit(guardar_frame, frame, index, temp_dir))
index += 1
cap.release()
frame_paths = [f.result() for f in futures]
frame_count = len(frame_paths)
if frame_count == 0:
raise gr.Error("No se pudieron extraer fotogramas del video.")
n_seleccion = 4
step = max(1, frame_count // (n_seleccion + 1))
selected_indices = [step * (i+1) for i in range(n_seleccion)]
selected_frames = [frame_paths[min(i, len(frame_paths)-1)] for i in selected_indices]
images = []
for img_path in selected_frames:
img = Image.open(img_path)
bordered_img = ImageOps.expand(img, border=2, fill='white')
images.append(bordered_img)
img_w, img_h = images[0].size
margin = 30
border_size = 20
shadow_offset = 5
collage_width = (img_w * 2) + margin + (border_size * 2)
collage_height = (img_h * 2) + margin + (border_size * 2)
collage = Image.new('RGB', (collage_width, collage_height), (230, 230, 230))
positions = [
(border_size, border_size),
(border_size + img_w + margin, border_size),
(border_size, border_size + img_h + margin),
(border_size + img_w + margin, border_size + img_h + margin)
]
for i, img in enumerate(images):
shadow = Image.new('RGBA', (img_w + shadow_offset, img_h + shadow_offset), (0,0,0,50))
collage.paste(shadow, (positions[i][0]+shadow_offset, positions[i][1]+shadow_offset), shadow)
collage.paste(img, positions[i])
collage_path = os.path.join(temp_dir, "collage_forense.jpg")
collage.save(collage_path, quality=95, dpi=(300, 300))
base_name = os.path.splitext(original_name)[0]
zip_filename = f"{base_name}_Fotogramas.zip"
final_zip_path = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(final_zip_path, mode="w") as zipf:
for img_path in frame_paths:
zipf.write(img_path, os.path.basename(img_path))
with open(video_path, "rb") as f:
video_hash = hashlib.md5(f.read()).hexdigest()
chain_content = (
"=== CADENA DE CUSTODIA DIGITAL ===\\r\\n\\r\\n"
f"• Archivo original: {original_name}\\r\\n"
f"• Fecha de procesamiento: {timestamp}\\r\\n"
f"• Fotogramas totales: {frame_count}\\r\\n"
f"• Hash MD5 video: {video_hash}\\r\\n"
f"• Fotogramas muestra: {', '.join([f'#{i+1}' for i in selected_indices])}\\r\\n\\r\\n"
"Este documento certifica la integridad del proceso de extracción.\\n"
"Sistema Certificado por Peritos Forenses Digitales de Guatemala. \\n"
"www.forensedigital.gt"
)
zipf.writestr("00_CADENA_CUSTODIA.txt", chain_content)
global TEMP_CACHE
TEMP_CACHE = temp_dir
return collage_path, final_zip_path
except Exception as e:
raise gr.Error(f"Error en procesamiento: {str(e)}")
def limpiar_cache():
global TEMP_CACHE
if TEMP_CACHE and os.path.exists(TEMP_CACHE):
shutil.rmtree(TEMP_CACHE)
TEMP_CACHE = None
with gr.Blocks(title="Extractor Forense de Fotogramas") as demo:
gr.Markdown("# 📷 Extractor Forense de Fotogramas de Videos")
gr.Markdown("**Herramienta para la Extracción Forense de Fotogramas de Videos** (No se guarda ninguna información).")
gr.Markdown("Desarrollado por José R. Leonett para el Grupo de Peritos Forenses Digitales de Guatemala - [www.forensedigital.gt](https://www.forensedigital.gt)")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(
label="🎞️ VIDEO CARGADO",
format="mp4",
interactive=True,
height=480,
sources=["upload"]
)
procesar_btn = gr.Button("🔍 INICIAR ANÁLISIS", interactive=False)
with gr.Column(scale=1):
gallery_output = gr.Image(label="📸 COLLAGE DE REFERENCIA", height=400)
download_file = gr.File(label="📂 DESCARGAR EVIDENCIAS", visible=True)
def habilitar_procesado(video):
limpiar_cache()
return gr.update(interactive=True), None, None
video_input.change(
fn=habilitar_procesado,
inputs=video_input,
outputs=[procesar_btn, gallery_output, download_file],
queue=False
)
def procesar_y_mostrar(video):
collage, zip_path = procesar_video(video)
return collage, zip_path
procesar_btn.click(
fn=procesar_y_mostrar,
inputs=video_input,
outputs=[gallery_output, download_file]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
|