Spaces:
Running
Running
| from pathlib import Path | |
| from typing import Optional | |
| import hashlib, base64, binascii, math, subprocess, shutil, os, logging, uuid, tempfile | |
| from dotenv import load_dotenv | |
| from imageio_ffmpeg import get_ffmpeg_exe | |
| import boto3 | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| # ---------- HASH UTILS ---------- | |
| def _sha256(path: Path, chunk: int = 1 << 20) -> str: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for b in iter(lambda: f.read(chunk), b""): | |
| h.update(b) | |
| return h.hexdigest() | |
| def _hashid_short(sha256_hex: str, length: int = 12) -> str: | |
| b = binascii.unhexlify(sha256_hex) | |
| return base64.urlsafe_b64encode(b).decode().rstrip("=")[:length] | |
| # ---------- FFMPEG ---------- | |
| def _ffmpeg_bin() -> str: | |
| try: | |
| return get_ffmpeg_exe() | |
| except Exception: | |
| p = os.environ.get("FFMPEG_BIN") or shutil.which("ffmpeg") | |
| if not p: | |
| raise FileNotFoundError("ffmpeg not found. Install it or set FFMPEG_BIN") | |
| return p | |
| FFMPEG = _ffmpeg_bin() | |
| def _run_ffmpeg(cmd: list[str]) -> None: | |
| cmd = cmd[:] | |
| cmd[0] = FFMPEG | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if proc.returncode != 0: | |
| raise RuntimeError(proc.stderr.decode(errors="ignore") or "ffmpeg failed") | |
| # ---------- THUMBNAIL ---------- | |
| def extract_thumbnail(video_path: str, time_position: str = "00:00:01") -> str: | |
| """ | |
| Extracts a thumbnail (JPEG) from the video and returns it as a base64 string. | |
| Default frame at 1 second. | |
| """ | |
| tmp_thumb = Path(tempfile.gettempdir()) / f"{Path(video_path).stem}_thumb.jpg" | |
| cmd = [ | |
| "ffmpeg", "-nostdin", "-y", | |
| "-ss", time_position, | |
| "-i", video_path, | |
| "-frames:v", "1", | |
| "-q:v", "2", | |
| str(tmp_thumb), | |
| ] | |
| _run_ffmpeg(cmd) | |
| with open(tmp_thumb, "rb") as f: | |
| encoded = base64.b64encode(f.read()).decode("utf-8") | |
| return encoded | |
| # ---------- AUGMENT VIDEO ---------- | |
| def augment_video_random( | |
| *, | |
| input_path: str, | |
| output_path: Optional[str] = None, | |
| crf: int = 20, | |
| preset: str = "medium", | |
| ) -> str: | |
| import random | |
| k_b, k_c, k_h, k_s = [random.randint(-5, 5) for _ in range(4)] | |
| brightness = max(-1.0, min(1.0, k_b * 0.05)) | |
| contrast = max(0.0, min(2.0, 1.0 + k_c * 0.05)) | |
| hue_rad = math.radians(k_h * 5.0) | |
| sat_scale = max(0.0, 1.0 + k_s * 0.05) | |
| vf = f"hue=h={hue_rad:.6f}:s={sat_scale:.4f},eq=contrast={contrast:.4f}:brightness={brightness:.4f}" | |
| inp = Path(input_path) | |
| out = Path(output_path) / f"{inp.stem}_augmented.mp4" if output_path else inp.with_name(f"{inp.stem}_augmented.mp4") | |
| cmd = [ | |
| "ffmpeg", "-nostdin", "-y", | |
| "-i", str(inp), | |
| "-map", "0:v:0", "-map", "0:a?", | |
| "-vf", vf, | |
| "-c:v", "libx264", "-preset", preset, "-crf", str(crf), | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "copy", | |
| "-movflags", "+faststart", | |
| str(out), | |
| ] | |
| _run_ffmpeg(cmd) | |
| return str(out) | |
| # ---------- VIDEO UPLOAD TO R2 ---------- | |
| def upload_video_to_r2(video_bytes: bytes, file_name: str) -> str: | |
| s3 = boto3.client( | |
| "s3", | |
| endpoint_url=os.getenv("R2_ENDPOINT"), | |
| aws_access_key_id=os.getenv("R2_ACCESS_KEY"), | |
| aws_secret_access_key=os.getenv("R2_SECRET_KEY"), | |
| region_name="auto", | |
| ) | |
| bucket = os.getenv("R2_BUCKET_NAME") | |
| key = f"videos/{uuid.uuid4()}_{file_name}" | |
| s3.put_object(Bucket=bucket, Key=key, Body=video_bytes, ContentType="video/mp4") | |
| return f"{os.getenv('NEW_BASE').rstrip('/')}/{key}" | |
| # ---------- PROCESS + UPLOAD ---------- | |
| def process_video_with_hash_info(input_path: str, output_path: Optional[str] = None) -> dict: | |
| in_sha = _sha256(Path(input_path)) | |
| in_id = _hashid_short(in_sha) | |
| # Augment and get output | |
| out_path = augment_video_random(input_path=input_path, output_path=output_path) | |
| # output hashid | |
| out_sha = _sha256(Path(out_path)) | |
| out_id = _hashid_short(out_sha) | |
| # copy file to safe tmp path | |
| safe_tmp = Path(tempfile.gettempdir()) / Path(out_path).name | |
| shutil.copy(out_path, safe_tmp) | |
| # thumbnail | |
| try: | |
| thumbnail_b64 = extract_thumbnail(str(safe_tmp)) | |
| except Exception as e: | |
| logger.error(f"Thumbnail extraction failed: {e}") | |
| thumbnail_b64 = "" | |
| try: | |
| with open(safe_tmp, "rb") as f: | |
| video_bytes = f.read() | |
| r2_url = upload_video_to_r2(video_bytes, Path(safe_tmp).name) | |
| except Exception as e: | |
| logger.error(f"Upload to R2 failed: {e}") | |
| r2_url = None | |
| return { | |
| "input_hashid": in_id, | |
| "output_name": Path(safe_tmp).name, | |
| "output_path": str(safe_tmp), | |
| "output_hashid": out_id, | |
| "output_r2_url": r2_url, | |
| "thumbnail": thumbnail_b64, | |
| } |