|
|
from __future__ import annotations |
|
|
|
|
|
import streamlit as st |
|
|
import tempfile |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
import math, random, hashlib, base64, binascii, subprocess, shutil |
|
|
import logging |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def _ffmpeg_bin() -> str: |
|
|
try: |
|
|
from imageio_ffmpeg import get_ffmpeg_exe |
|
|
return get_ffmpeg_exe() |
|
|
except Exception: |
|
|
p = os.environ.get("FFMPEG_BIN") or shutil.which("ffmpeg") |
|
|
if not p: |
|
|
raise FileNotFoundError( |
|
|
"ffmpeg not found. Install Homebrew ffmpeg or `pip install imageio-ffmpeg`, " |
|
|
"or set FFMPEG_BIN to the binary path." |
|
|
) |
|
|
return p |
|
|
|
|
|
FFMPEG = _ffmpeg_bin() |
|
|
|
|
|
def _run_ffmpeg(cmd: list[str]) -> None: |
|
|
cmd = cmd[:] ; cmd[0] = FFMPEG |
|
|
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if proc.returncode != 0: |
|
|
raise RuntimeError(proc.stderr.decode(errors="ignore") or "ffmpeg failed") |
|
|
|
|
|
def _next_available(p: Path) -> Path: |
|
|
if not p.exists(): return p |
|
|
stem, suf = p.stem, p.suffix |
|
|
i = 1 |
|
|
while True: |
|
|
q = p.with_name(f"{stem}_{i}{suf}") |
|
|
if not q.exists(): return q |
|
|
i += 1 |
|
|
|
|
|
def _derive_out(in_path: Path, out_path: Optional[str]) -> Path: |
|
|
if out_path is None: |
|
|
p = in_path.with_name(f"{in_path.stem}_augmented2.mp4") |
|
|
else: |
|
|
p = Path(out_path) |
|
|
if p.is_dir() or str(out_path).endswith(("/", "\\")): |
|
|
p = Path(out_path) / f"{in_path.stem}_augmented2.mp4" |
|
|
if p.suffix == "": |
|
|
p = p.with_suffix(".mp4") |
|
|
p = _next_available(p) |
|
|
p.parent.mkdir(parents=True, exist_ok=True) |
|
|
return p |
|
|
|
|
|
def _sha256(path: Path, chunk: int = 1 << 20) -> str: |
|
|
h = hashlib.sha256() |
|
|
with path.open("rb") as f: |
|
|
for b in iter(lambda: f.read(chunk), b""): h.update(b) |
|
|
return h.hexdigest() |
|
|
|
|
|
def _hashid_short(sha256_hex: str, length: int = 12) -> str: |
|
|
b = binascii.unhexlify(sha256_hex) |
|
|
return base64.urlsafe_b64encode(b).decode().rstrip("=")[:length] |
|
|
|
|
|
def _rand_color_hex() -> str: |
|
|
return f"0x{random.randrange(0, 0xFFFFFF+1):06x}" |
|
|
|
|
|
def augment_video_random( |
|
|
*, |
|
|
input_path: str, |
|
|
output_path: Optional[str] = None, |
|
|
temporal_radius: int = 5, |
|
|
seed: Optional[int] = None, |
|
|
crf: int = 20, |
|
|
preset: str = "medium", |
|
|
) -> str: |
|
|
if seed is not None: |
|
|
random.seed(seed) |
|
|
|
|
|
k_b = random.randint(-5, 5) |
|
|
k_c = random.randint(-5, 5) |
|
|
k_h = random.randint(-5, 5) |
|
|
k_s = random.randint(-5, 5) |
|
|
k_w = random.randint(-5, 5) |
|
|
|
|
|
brightness = max(-1.0, min(1.0, k_b * 0.05)) |
|
|
contrast = max(0.0, min(2.0, 1.0 + k_c * 0.05)) |
|
|
hue_rad = math.radians(k_h * 5.0) |
|
|
sat_scale = max(0.0, 1.0 + k_s * 0.05) |
|
|
border_px = max(1, min(5, abs(k_w))) |
|
|
color = _rand_color_hex() |
|
|
|
|
|
parts = [] |
|
|
if temporal_radius > 0: |
|
|
parts.append(f"deflicker=size={2*temporal_radius+1}") |
|
|
parts += [ |
|
|
f"hue=h={hue_rad:.6f}:s={sat_scale:.4f}", |
|
|
f"eq=contrast={contrast:.4f}:brightness={brightness:.4f}", |
|
|
] |
|
|
if border_px: |
|
|
parts.append(f"pad=iw+{2*border_px}:ih+{2*border_px}:{border_px}:{border_px}:color={color}") |
|
|
vf_with = ",".join(parts) |
|
|
vf_no = ",".join(parts[1:]) if temporal_radius > 0 else vf_with |
|
|
|
|
|
inp = Path(input_path) |
|
|
if not inp.exists(): raise FileNotFoundError(f"Input not found: {inp}") |
|
|
out = _derive_out(inp, output_path) |
|
|
|
|
|
def _encode(vf: str) -> None: |
|
|
cmd = [ |
|
|
"ffmpeg", "-nostdin", "-y", |
|
|
"-i", str(inp), |
|
|
"-map", "0:v:0", "-map", "0:a?", |
|
|
"-vf", vf, |
|
|
"-c:v", "libx264", "-preset", preset, "-crf", str(crf), |
|
|
"-pix_fmt", "yuv420p", |
|
|
"-c:a", "copy", |
|
|
"-movflags", "+faststart", |
|
|
str(out), |
|
|
] |
|
|
try: |
|
|
_run_ffmpeg(cmd) |
|
|
except RuntimeError as e: |
|
|
msg = str(e).lower() |
|
|
if "no such filter" in msg and "deflicker" in msg and vf != vf_no: |
|
|
_encode(vf_no) |
|
|
elif "could not find tag for codec" in msg: |
|
|
cmd2 = cmd[:] |
|
|
i = cmd2.index("-c:a") + 1 |
|
|
cmd2[i] = "aac" |
|
|
cmd2.insert(i + 1, "192k") |
|
|
cmd2.insert(i + 1, "-b:a") |
|
|
_run_ffmpeg(cmd2) |
|
|
else: |
|
|
raise |
|
|
|
|
|
_encode(vf_with) |
|
|
return str(out) |
|
|
|
|
|
def process_video_with_hash_info(input_path: str, output_path: Optional[str] = None) -> dict: |
|
|
out_path = augment_video_random(input_path=input_path, output_path=output_path) |
|
|
inp = Path(input_path); out = Path(out_path) |
|
|
in_sha = _sha256(inp); in_id = _hashid_short(in_sha) |
|
|
out_sha = _sha256(out); out_id = _hashid_short(out_sha) |
|
|
|
|
|
return { |
|
|
"input_path": str(inp), |
|
|
"input_sha256": in_sha, |
|
|
"input_hashid": in_id, |
|
|
"output_path": str(out), |
|
|
"output_name": out.name, |
|
|
"output_sha256": out_sha, |
|
|
"output_hashid": out_id, |
|
|
} |
|
|
|
|
|
def check_token(token: str) -> tuple[bool, str]: |
|
|
""" |
|
|
Check if the provided token is valid. |
|
|
Returns (is_valid, error_message) |
|
|
""" |
|
|
|
|
|
access_token = os.getenv("ACCESS_TOKEN") |
|
|
|
|
|
if not access_token: |
|
|
|
|
|
if token in valid_tokens: |
|
|
return True, valid_tokens[token] |
|
|
else: |
|
|
return False, "Invalid access token. Please contact administrator." |
|
|
|
|
|
|
|
|
if token == access_token: |
|
|
return True, "Access granted" |
|
|
else: |
|
|
return False, "Invalid access token. Please contact administrator." |
|
|
|
|
|
def create_main_app(): |
|
|
"""Main application content after authentication""" |
|
|
|
|
|
|
|
|
st.markdown("# Video Hash Augmentation Tool") |
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
col_upload, col_generate = st.columns([2, 1]) |
|
|
|
|
|
with col_upload: |
|
|
uploaded_file = st.file_uploader( |
|
|
"Choose a video file", |
|
|
type=['mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv', 'webm'], |
|
|
help="Upload your video to generate an augmented version with a different hash" |
|
|
) |
|
|
|
|
|
with col_generate: |
|
|
generate_clicked = st.button( |
|
|
"Generate Augmented Video", |
|
|
type="primary", |
|
|
use_container_width=True, |
|
|
disabled=uploaded_file is None |
|
|
) |
|
|
|
|
|
if uploaded_file is None: |
|
|
st.info("Upload a video and click 'Generate' to process.") |
|
|
elif generate_clicked: |
|
|
st.success("Video processed successfully!") |
|
|
|
|
|
if uploaded_file is not None: |
|
|
st.success(f"File uploaded: {uploaded_file.name} ({uploaded_file.size / 1024 / 1024:.1f} MB)") |
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
col1, col2 = st.columns([1, 1], gap="large") |
|
|
|
|
|
with col1: |
|
|
st.markdown("### Original Video") |
|
|
st.video(uploaded_file) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file: |
|
|
tmp_file.write(uploaded_file.getvalue()) |
|
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
original_hash = _sha256(Path(tmp_file_path)) |
|
|
original_hashid = _hashid_short(original_hash) |
|
|
|
|
|
st.markdown("**Hash:**") |
|
|
st.code(original_hashid) |
|
|
|
|
|
os.unlink(tmp_file_path) |
|
|
|
|
|
with col2: |
|
|
st.markdown("### Augmented Video") |
|
|
|
|
|
if generate_clicked: |
|
|
try: |
|
|
with st.spinner("Processing video..."): |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_input: |
|
|
tmp_input.write(uploaded_file.getvalue()) |
|
|
tmp_input_path = tmp_input.name |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir: |
|
|
result = process_video_with_hash_info( |
|
|
input_path=tmp_input_path, |
|
|
output_path=tmp_dir |
|
|
) |
|
|
|
|
|
with open(result["output_path"], "rb") as f: |
|
|
processed_video_bytes = f.read() |
|
|
|
|
|
st.video(processed_video_bytes) |
|
|
|
|
|
st.markdown("**Hash:**") |
|
|
st.code(result['output_hashid']) |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.download_button( |
|
|
label="Download Augmented Video", |
|
|
data=processed_video_bytes, |
|
|
file_name=result['output_name'], |
|
|
mime="video/mp4", |
|
|
type="primary", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error processing video: {str(e)}") |
|
|
st.info("Make sure FFmpeg is properly installed on your system.") |
|
|
|
|
|
finally: |
|
|
try: |
|
|
os.unlink(tmp_input_path) |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
st.info("Please upload a video file to get started.") |
|
|
|
|
|
def main(): |
|
|
st.set_page_config(page_title="Video Hash Augmentation Tool", layout="wide") |
|
|
|
|
|
if "authenticated" not in st.session_state: |
|
|
st.session_state["authenticated"] = False |
|
|
|
|
|
if not st.session_state["authenticated"]: |
|
|
st.markdown("## Access Required") |
|
|
token_input = st.text_input("Enter Access Token", type="password") |
|
|
|
|
|
if st.button("Unlock App"): |
|
|
ok, error_msg = check_token(token_input) |
|
|
if ok: |
|
|
st.session_state["authenticated"] = True |
|
|
st.rerun() |
|
|
else: |
|
|
st.error(error_msg) |
|
|
else: |
|
|
create_main_app() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
logger.info("Launching Streamlit app...") |
|
|
main() |
|
|
except Exception as e: |
|
|
logger.exception("Unhandled error during app launch.") |