Spaces:
Running
Running
File size: 4,818 Bytes
638b572 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
from pathlib import Path
from typing import Optional
import hashlib, base64, binascii, math, subprocess, shutil, os, logging, uuid, tempfile
from dotenv import load_dotenv
from imageio_ffmpeg import get_ffmpeg_exe
import boto3
load_dotenv()
logger = logging.getLogger(__name__)
# ---------- HASH UTILS ----------
def _sha256(path: Path, chunk: int = 1 << 20) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for b in iter(lambda: f.read(chunk), b""):
h.update(b)
return h.hexdigest()
def _hashid_short(sha256_hex: str, length: int = 12) -> str:
b = binascii.unhexlify(sha256_hex)
return base64.urlsafe_b64encode(b).decode().rstrip("=")[:length]
# ---------- FFMPEG ----------
def _ffmpeg_bin() -> str:
try:
return get_ffmpeg_exe()
except Exception:
p = os.environ.get("FFMPEG_BIN") or shutil.which("ffmpeg")
if not p:
raise FileNotFoundError("ffmpeg not found. Install it or set FFMPEG_BIN")
return p
FFMPEG = _ffmpeg_bin()
def _run_ffmpeg(cmd: list[str]) -> None:
cmd = cmd[:]
cmd[0] = FFMPEG
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode(errors="ignore") or "ffmpeg failed")
# ---------- THUMBNAIL ----------
def extract_thumbnail(video_path: str, time_position: str = "00:00:01") -> str:
"""
Extracts a thumbnail (JPEG) from the video and returns it as a base64 string.
Default frame at 1 second.
"""
tmp_thumb = Path(tempfile.gettempdir()) / f"{Path(video_path).stem}_thumb.jpg"
cmd = [
"ffmpeg", "-nostdin", "-y",
"-ss", time_position,
"-i", video_path,
"-frames:v", "1",
"-q:v", "2",
str(tmp_thumb),
]
_run_ffmpeg(cmd)
with open(tmp_thumb, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return encoded
# ---------- AUGMENT VIDEO ----------
def augment_video_random(
*,
input_path: str,
output_path: Optional[str] = None,
crf: int = 20,
preset: str = "medium",
) -> str:
import random
k_b, k_c, k_h, k_s = [random.randint(-5, 5) for _ in range(4)]
brightness = max(-1.0, min(1.0, k_b * 0.05))
contrast = max(0.0, min(2.0, 1.0 + k_c * 0.05))
hue_rad = math.radians(k_h * 5.0)
sat_scale = max(0.0, 1.0 + k_s * 0.05)
vf = f"hue=h={hue_rad:.6f}:s={sat_scale:.4f},eq=contrast={contrast:.4f}:brightness={brightness:.4f}"
inp = Path(input_path)
out = Path(output_path) / f"{inp.stem}_augmented.mp4" if output_path else inp.with_name(f"{inp.stem}_augmented.mp4")
cmd = [
"ffmpeg", "-nostdin", "-y",
"-i", str(inp),
"-map", "0:v:0", "-map", "0:a?",
"-vf", vf,
"-c:v", "libx264", "-preset", preset, "-crf", str(crf),
"-pix_fmt", "yuv420p",
"-c:a", "copy",
"-movflags", "+faststart",
str(out),
]
_run_ffmpeg(cmd)
return str(out)
# ---------- VIDEO UPLOAD TO R2 ----------
def upload_video_to_r2(video_bytes: bytes, file_name: str) -> str:
s3 = boto3.client(
"s3",
endpoint_url=os.getenv("R2_ENDPOINT"),
aws_access_key_id=os.getenv("R2_ACCESS_KEY"),
aws_secret_access_key=os.getenv("R2_SECRET_KEY"),
region_name="auto",
)
bucket = os.getenv("R2_BUCKET_NAME")
key = f"videos/{uuid.uuid4()}_{file_name}"
s3.put_object(Bucket=bucket, Key=key, Body=video_bytes, ContentType="video/mp4")
return f"{os.getenv('NEW_BASE').rstrip('/')}/{key}"
# ---------- PROCESS + UPLOAD ----------
def process_video_with_hash_info(input_path: str, output_path: Optional[str] = None) -> dict:
in_sha = _sha256(Path(input_path))
in_id = _hashid_short(in_sha)
# Augment and get output
out_path = augment_video_random(input_path=input_path, output_path=output_path)
# output hashid
out_sha = _sha256(Path(out_path))
out_id = _hashid_short(out_sha)
# copy file to safe tmp path
safe_tmp = Path(tempfile.gettempdir()) / Path(out_path).name
shutil.copy(out_path, safe_tmp)
# thumbnail
try:
thumbnail_b64 = extract_thumbnail(str(safe_tmp))
except Exception as e:
logger.error(f"Thumbnail extraction failed: {e}")
thumbnail_b64 = ""
try:
with open(safe_tmp, "rb") as f:
video_bytes = f.read()
r2_url = upload_video_to_r2(video_bytes, Path(safe_tmp).name)
except Exception as e:
logger.error(f"Upload to R2 failed: {e}")
r2_url = None
return {
"input_hashid": in_id,
"output_name": Path(safe_tmp).name,
"output_path": str(safe_tmp),
"output_hashid": out_id,
"output_r2_url": r2_url,
"thumbnail": thumbnail_b64,
} |