import os import json import zlib import uuid import hashlib import datetime import mimetypes from typing import Dict, Optional CLIPCHAMP_OUTPUT_FOLDER = os.environ.get("CLIPCHAMP_OUTPUT_FOLDER", "clipchamp_projects") # ------------------------- # Configuration / Namespace # ------------------------- # Use a fixed namespace UUID so uuid5 outputs are deterministic across runs/machines. _DETERMINISTIC_NAMESPACE = uuid.UUID("11111111-2222-3333-4444-555555555555") # ------------------------- # Helpers # ------------------------- def _now_iso(): # milliseconds precision return datetime.datetime.utcnow().isoformat(timespec="milliseconds") + "Z" def _file_meta(path): size = None mime = None if path: # Accept URIs like file:///... or plain paths if path.startswith("file://"): p = path[7:] else: p = path try: if os.path.exists(p): size = os.path.getsize(p) except Exception: size = None try: mime = mimetypes.guess_type(p)[0] except Exception: mime = None return {"size": size, "mimeType": mime} def _sha1_of_file(path): if not path: return None if path.startswith("file://"): p = path[7:] else: p = path try: if not os.path.exists(p): return None h = hashlib.sha1() with open(p, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() except Exception: return None def _guess_type(mime, filename): if mime and mime.startswith("video"): return "video" ext = os.path.splitext(filename)[1].lower() if ext in (".mp4", ".mov", ".m4v", ".avi", ".webm"): return "video" if ext in (".mp3", ".wav", ".aac"): return "audio" if ext in (".png", ".jpg", ".jpeg", ".gif", ".webp"): return "image" return "file" # deterministic id functions (uuid5) def deterministic_project_id(project_name: str) -> str: return str(uuid.uuid5(_DETERMINISTIC_NAMESPACE, f"project:{project_name}")) def deterministic_asset_id(key_or_path: str) -> str: # key_or_path should be unique per asset (e.g. "video_1" or absolute path) return str(uuid.uuid5(_DETERMINISTIC_NAMESPACE, f"asset:{key_or_path}")) def deterministic_item_id(project_id: str, index: int, source_key: str, start_ms: Optional[int]) -> str: base = f"item:{project_id}:{index}:{source_key}:{start_ms}" return str(uuid.uuid5(_DETERMINISTIC_NAMESPACE, base)) def deterministic_track_id(project_id: str, track_label: str = "video_track_1") -> str: return str(uuid.uuid5(_DETERMINISTIC_NAMESPACE, f"track:{project_id}:{track_label}")) # ------------------------- # Main builder # ------------------------- def build_clipchamp_bytes( project_name: str, ai_json: Dict, source_to_filename: Dict, source_to_duration: Dict = None, aspect_ratio: Optional[str] = "16:9", render_settings: Optional[Dict] = None, schema_version: str = "8.6.4", ) -> bytes: """ Build a zlib-compressed JSON blob that closely matches Clipchamp's export shape. Deterministic IDs are produced via uuid5 so repeated runs with identical inputs will produce stable IDs. """ if source_to_duration is None: source_to_duration = {} created_at = _now_iso() project_id = deterministic_project_id(project_name) # Base project skeleton using keys observed in Clipchamp exports project = { "version": 7, "schemaVersion": schema_version, "id": project_id, "name": project_name, "createdAt": created_at, "updatedAt": created_at, "aspectRatio": aspect_ratio, "renderSettings": render_settings or { "format": "mp4", "resolution": {"width": 1920, "height": 1080}, "frameRate": 30 }, # keep containers expected by Clipchamp "entities": {}, "flavor": {}, "groups": {}, "assets": {"byId": {}, "allIds": []}, "items": {"byId": {}, "allIds": []}, "tracks": {"byId": {}, "allIds": []}, } # Build assets deterministically asset_id_map = {} for key, fname in source_to_filename.items(): # Use provided key (if available) to derive deterministic ID; fallback to filename id_seed = key if key else (os.path.basename(fname) if fname else str(uuid.uuid4())) aid = deterministic_asset_id(id_seed) asset_id_map[key] = aid meta = _file_meta(fname) sha1 = _sha1_of_file(fname) filename = os.path.basename(fname) if fname else (key or "unknown") asset_obj = { "id": aid, "name": filename, "type": _guess_type(meta["mimeType"], filename), "loop": False, "hidden": False, # include sha1 if available "fileHashes": {"sha1": sha1} if sha1 else {}, "metadata": { "duration": source_to_duration.get(key), "size": meta["size"], "mimeType": meta["mimeType"], }, # richer extData: preserve originalFileName, uri, importedAt and importedSize "extData": { "originalFileName": filename, "uri": fname, "importedAt": created_at, "importedSize": meta["size"], # keep a deterministic file id too (useful for Clipchamp internals) "fileId": deterministic_asset_id(f"file:{filename}:{meta.get('size')}"), }, } project["assets"]["byId"][aid] = asset_obj project["assets"]["allIds"].append(aid) # Single default video track (deterministic id) track_id = deterministic_track_id(project_id, "video_track_1") project["tracks"]["byId"][track_id] = { "id": track_id, "type": "video", "index": 0, "items": [], "muted": False } project["tracks"]["allIds"].append(track_id) # Build timeline items from ai_json['clips'] preserving order sorted_clips = sorted(ai_json.get("clips", []), key=lambda c: (c.get("order") or 0)) cursor = 0.0 for idx, c in enumerate(sorted_clips, start=1): # Defensive conversions - ensure fields exist source_key = c.get("source") if source_key is None: # skip invalid entry continue # start/end times may be provided as floats or strings start = float(c.get("start_time", 0.0)) end = float(c.get("end_time", start)) duration = max(0.0, end - start) # deterministic item id based on project, index, source and source start (ms) start_ms = int(c.get("start_time_ms", round(start * 1000))) item_id = deterministic_item_id(project_id, idx, source_key, start_ms) # map asset id asset_id = asset_id_map.get(source_key) # fallback: try to find asset that matches filename basename if asset_id is None: # try match by basename for k, fname in source_to_filename.items(): if k == source_key or os.path.basename(fname) == source_key: asset_id = asset_id_map.get(k) break item_obj = { "id": item_id, "assetId": asset_id, "type": "timeline_item", # position on timeline (seconds) "startTime": cursor, "duration": duration, # trim ranges on the source asset (seconds) "trim": {"from": start, "to": end}, # source ms fields "source_start_ms": start_ms, "source_end_ms": int(c.get("end_time_ms", round(end * 1000))), "speed": float(c.get("speed", 1.0)), "volume": 0.0 if c.get("mute") else float(c.get("volume", 1.0)), "enabled": bool(c.get("enabled", True)), "locked": bool(c.get("locked", False)), "position": c.get("position", {"x": 0, "y": 0, "scale": 1.0, "rotation": 0}), "order": c.get("order"), # extData for traceability: include original clip payload and deterministic local id "extData": { "orig_source_key": source_key, "orig_index": idx, } } project["items"]["byId"][item_id] = item_obj project["items"]["allIds"].append(item_id) # add to track project["tracks"]["byId"][track_id]["items"].append(item_id) # advance cursor cursor += duration # timeline (duration and per-track mapping) project["timeline"] = {"duration": cursor, "tracks": {}} for tid, t in project["tracks"]["byId"].items(): project["timeline"]["tracks"][tid] = { "id": tid, "items": t["items"], "duration": cursor } # finalize updated timestamp project["updatedAt"] = _now_iso() # stable pretty JSON json_bytes = json.dumps(project, indent=2, ensure_ascii=False).encode("utf-8") return zlib.compress(json_bytes) # ------------------------- # File-saving wrapper # ------------------------- def save_clipchamp_project( project_name: str, ai_json: Dict, source_to_filename: Dict, source_to_duration: Dict = None, output_folder: str = None, **build_kwargs ) -> str: """ Build a .clipchamp project from AI JSON and save it to disk. Returns absolute path of saved file. """ folder = output_folder or CLIPCHAMP_OUTPUT_FOLDER os.makedirs(folder, exist_ok=True) clip_bytes = build_clipchamp_bytes(project_name, ai_json, source_to_filename, source_to_duration, **build_kwargs) # sanitize project_name for filename safe_name = "".join(c if c.isalnum() or c in (" ", "_", "-") else "_" for c in project_name).strip() out_path = os.path.join(folder, f"{safe_name}.clipchamp") with open(out_path, "wb") as f: f.write(clip_bytes) return os.path.abspath(out_path)