File size: 6,098 Bytes
e5be641
 
 
 
 
 
 
 
144e9fc
e5be641
 
 
144e9fc
 
 
 
 
 
 
e5be641
 
 
 
 
 
 
 
 
553627f
 
 
144e9fc
 
 
 
 
 
 
 
 
e5be641
 
144e9fc
 
 
e5be641
553627f
e5be641
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
553627f
 
 
 
 
 
 
 
e5be641
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe55513
e5be641
 
 
 
 
 
 
b856b54
144e9fc
33095ba
e5be641
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import gradio as gr
import cv2
import os
import zipfile
from PIL import Image, ImageOps
from datetime import datetime
import hashlib
import shutil
from concurrent.futures import ThreadPoolExecutor

TEMP_CACHE = None

def guardar_frame(frame, index, temp_dir):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(frame_rgb)
    img_path = os.path.join(temp_dir, f"frame_{index:04d}.jpg")
    img.save(img_path)
    return img_path

def procesar_video(video_path):
    try:
        original_name = os.path.basename(video_path)

        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        temp_dir = f"temp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        os.makedirs(temp_dir, exist_ok=True)

        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise gr.Error("No se pudo abrir el archivo de video. Formato no soportado o archivo dañado.")

        index = 0
        futures = []
        with ThreadPoolExecutor(max_workers=4) as executor:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                futures.append(executor.submit(guardar_frame, frame, index, temp_dir))
                index += 1
        cap.release()

        frame_paths = [f.result() for f in futures]
        frame_count = len(frame_paths)

        if frame_count == 0:
            raise gr.Error("No se pudieron extraer fotogramas del video.")

        n_seleccion = 4
        step = max(1, frame_count // (n_seleccion + 1))
        selected_indices = [step * (i+1) for i in range(n_seleccion)]
        selected_frames = [frame_paths[min(i, len(frame_paths)-1)] for i in selected_indices]

        images = []
        for img_path in selected_frames:
            img = Image.open(img_path)
            bordered_img = ImageOps.expand(img, border=2, fill='white')
            images.append(bordered_img)

        img_w, img_h = images[0].size
        margin = 30
        border_size = 20
        shadow_offset = 5

        collage_width = (img_w * 2) + margin + (border_size * 2)
        collage_height = (img_h * 2) + margin + (border_size * 2)

        collage = Image.new('RGB', (collage_width, collage_height), (230, 230, 230))
        positions = [
            (border_size, border_size),
            (border_size + img_w + margin, border_size),
            (border_size, border_size + img_h + margin),
            (border_size + img_w + margin, border_size + img_h + margin)
        ]

        for i, img in enumerate(images):
            shadow = Image.new('RGBA', (img_w + shadow_offset, img_h + shadow_offset), (0,0,0,50))
            collage.paste(shadow, (positions[i][0]+shadow_offset, positions[i][1]+shadow_offset), shadow)
            collage.paste(img, positions[i])

        collage_path = os.path.join(temp_dir, "collage_forense.jpg")
        collage.save(collage_path, quality=95, dpi=(300, 300))

        base_name = os.path.splitext(original_name)[0]
        zip_filename = f"{base_name}_Fotogramas.zip"
        final_zip_path = os.path.join(temp_dir, zip_filename)

        with zipfile.ZipFile(final_zip_path, mode="w") as zipf:
            for img_path in frame_paths:
                zipf.write(img_path, os.path.basename(img_path))

            with open(video_path, "rb") as f:
                video_hash = hashlib.md5(f.read()).hexdigest()

            chain_content = (
                "=== CADENA DE CUSTODIA DIGITAL ===\\r\\n\\r\\n"
                f"• Archivo original: {original_name}\\r\\n"
                f"• Fecha de procesamiento: {timestamp}\\r\\n"
                f"• Fotogramas totales: {frame_count}\\r\\n"
                f"• Hash MD5 video: {video_hash}\\r\\n"
                f"• Fotogramas muestra: {', '.join([f'#{i+1}' for i in selected_indices])}\\r\\n\\r\\n"
                "Este documento certifica la integridad del proceso de extracción.\\n"
                "Sistema Certificado por Peritos Forenses Digitales de Guatemala. \\n"
                "www.forensedigital.gt"
            )
            zipf.writestr("00_CADENA_CUSTODIA.txt", chain_content)

        global TEMP_CACHE
        TEMP_CACHE = temp_dir

        return collage_path, final_zip_path

    except Exception as e:
        raise gr.Error(f"Error en procesamiento: {str(e)}")

def limpiar_cache():
    global TEMP_CACHE
    if TEMP_CACHE and os.path.exists(TEMP_CACHE):
        shutil.rmtree(TEMP_CACHE)
    TEMP_CACHE = None

with gr.Blocks(title="Extractor Forense de Fotogramas") as demo:
    gr.Markdown("# 📷 Extractor Forense de Fotogramas de Videos")
    gr.Markdown("**Herramienta para la Extracción Forense de Fotogramas de Videos**  (No se guarda ninguna información).")
    gr.Markdown("Desarrollado por José R. Leonett para el Grupo de Peritos Forenses Digitales de Guatemala - [www.forensedigital.gt](https://www.forensedigital.gt)")

    with gr.Row():
        with gr.Column(scale=1):
            video_input = gr.Video(
                label="🎞️ VIDEO CARGADO",
                format="mp4",
                interactive=True,
                height=480,
                sources=["upload"]
            )
            procesar_btn = gr.Button("🔍 INICIAR ANÁLISIS", interactive=False)
        with gr.Column(scale=1):
            gallery_output = gr.Image(label="📸 COLLAGE DE REFERENCIA", height=400)
            download_file = gr.File(label="📂 DESCARGAR EVIDENCIAS", visible=True)

    def habilitar_procesado(video):
        limpiar_cache()
        return gr.update(interactive=True), None, None

    video_input.change(
        fn=habilitar_procesado,
        inputs=video_input,
        outputs=[procesar_btn, gallery_output, download_file],
        queue=False
    )

    def procesar_y_mostrar(video):
        collage, zip_path = procesar_video(video)
        return collage, zip_path

    procesar_btn.click(
        fn=procesar_y_mostrar,
        inputs=video_input,
        outputs=[gallery_output, download_file]
    )

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)