Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import re | |
| import struct | |
| import gradio as gr | |
| import subprocess | |
| # Основная функция для обработки видео | |
| def glitch_video(file, mode, countframes, positframes, audio, firstframe, kill): | |
| if not file: | |
| return "Error: Valid input file required!" | |
| filein = file.name | |
| temp_nb = random.randint(10000, 99999) | |
| temp_dir = f"temp-{temp_nb}" | |
| temp_hdrl = os.path.join(temp_dir, "hdrl.bin") | |
| temp_movi = os.path.join(temp_dir, "movi.bin") | |
| temp_idx1 = os.path.join(temp_dir, "idx1.bin") | |
| if not os.path.exists(temp_dir): | |
| os.mkdir(temp_dir) | |
| # Функция для чтения до маркера | |
| def bstream_until_marker(bfilein, bfileout, marker=None, startpos=0): | |
| chunk_size = 1024 | |
| filesize = os.path.getsize(bfilein) | |
| marker = str.encode(marker) if marker else None | |
| with open(bfilein, 'rb') as rd, open(bfileout, 'ab') as wr: | |
| for pos in range(startpos, filesize, chunk_size): | |
| rd.seek(pos) | |
| buffer = rd.read(chunk_size) | |
| if marker: | |
| marker_pos = buffer.find(marker) | |
| if marker_pos >= 0: | |
| marker_pos += pos | |
| wr.write(buffer[:marker_pos - pos]) | |
| return marker_pos | |
| else: | |
| wr.write(buffer) | |
| else: | |
| wr.write(buffer) | |
| return None | |
| # Разделение на части | |
| movi_marker_pos = bstream_until_marker(filein, temp_hdrl, "movi") | |
| idx1_marker_pos = bstream_until_marker(filein, temp_movi, "idx1", movi_marker_pos) | |
| bstream_until_marker(filein, temp_idx1, startpos=idx1_marker_pos) | |
| # Создание индекса кадров | |
| clean = [] | |
| with open(temp_movi, 'rb') as rd: | |
| filesize = os.path.getsize(temp_movi) | |
| frame_table = [] | |
| for pos in range(0, filesize, 1024): | |
| rd.seek(pos) | |
| buffer = rd.read(1024) | |
| for marker in (b'\x30\x31\x77\x62', b'\x30\x30\x64\x63'): | |
| frame_table.extend([(m.start() + pos, marker) for m in re.finditer(marker, buffer)]) | |
| frame_table.sort(key=lambda x: x[0]) | |
| max_frame_size = max((frame_table[i + 1][0] - frame_table[i][0]) if i + 1 < len(frame_table) else filesize - frame_table[i][0] for i in range(len(frame_table))) | |
| for i, (pos, marker) in enumerate(frame_table): | |
| frame_size = (frame_table[i + 1][0] - pos) if i + 1 < len(frame_table) else (filesize - pos) | |
| if frame_size <= max_frame_size * kill: # Фильтрация больших кадров | |
| clean.append((pos, frame_size, marker)) | |
| # Применение эффектов | |
| if mode == "void": | |
| final = clean | |
| elif mode == "random": | |
| final = random.sample(clean, len(clean)) | |
| elif mode == "reverse": | |
| final = clean[::-1] | |
| elif mode == "invert": | |
| final = sum(zip(clean[1::2], clean[::2]), ()) | |
| elif mode == "bloom": | |
| final = clean[:int(positframes)] + [clean[int(positframes)]] * int(countframes) + clean[int(positframes):] | |
| elif mode == "pulse": | |
| final = [val for i, val in enumerate(clean) for _ in range(int(countframes)) if i % int(positframes) == 0] + clean | |
| elif mode == "jiggle": | |
| final = [clean[min(max(0, x + int(random.gauss(0, int(positframes)))), len(clean) - 1)] for x in range(len(clean))] | |
| elif mode == "overlap": | |
| final = [item for sublist in [clean[i:i+int(countframes)] for i in range(0, len(clean), int(positframes)) ] for item in sublist] | |
| # Сборка видео обратно | |
| fileout = f"{os.path.splitext(filein)[0]}-{mode}.avi" | |
| if os.path.exists(fileout): | |
| os.remove(fileout) | |
| with open(fileout, "wb") as wr: | |
| bstream_until_marker(temp_hdrl, fileout) | |
| wr.write(b"movi") | |
| with open(temp_movi, "rb") as rd: | |
| for pos, size, _ in final: | |
| rd.seek(pos) | |
| wr.write(rd.read(size)) | |
| bstream_until_marker(temp_idx1, fileout) | |
| # Конвертация в .mp4 для обеспечения совместимости | |
| fileout_mp4 = f"{os.path.splitext(filein)[0]}-{mode}.mp4" | |
| cmd = f"ffmpeg -i {fileout} -c:v libx264 -crf 18 -preset veryfast {fileout_mp4}" | |
| subprocess.run(cmd, shell=True, check=True) | |
| # Очистка временных файлов | |
| os.remove(temp_hdrl) | |
| os.remove(temp_movi) | |
| os.remove(temp_idx1) | |
| os.remove(fileout) | |
| os.rmdir(temp_dir) | |
| return fileout_mp4 | |
| # Интерфейс Gradio | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Полнофункциональный Glitch Tool") | |
| input_file = gr.File(label="Загрузите AVI-файл", file_types=[".avi"]) | |
| mode = gr.Dropdown(choices=["void", "random", "reverse", "invert", "bloom", "pulse", "jiggle", "overlap"], label="Режим") | |
| countframes = gr.Slider(1, 10, step=1, label="Частота глитчей", value=1) | |
| positframes = gr.Slider(1, 10, step=1, label="Длительность глитча", value=1) | |
| audio = gr.Checkbox(label="Сохранить аудио", value=False) | |
| firstframe = gr.Checkbox(label="Сохранить первый кадр", value=True) | |
| kill = gr.Slider(0.1, 1.0, step=0.1, label="Интенсивность фильтрации", value=0.7) | |
| output_file = gr.File(label="Скачайте обработанный файл") | |
| submit_button = gr.Button("Обработать") | |
| submit_button.click( | |
| glitch_video, | |
| inputs=[input_file, mode, countframes, positframes, audio, firstframe, kill], | |
| outputs=output_file, | |
| ) | |
| # Запуск Gradio | |
| demo.launch(share=True) |