| """ |
| ViralCut Agent - Runtime |
| ======================== |
| The actual agent that uses the fine-tuned model to edit videos autonomously. |
| |
| This connects the trained model to real tools: |
| - FFmpeg for video editing |
| - DuckDuckGo for web search (free, no API key) |
| - Whisper for transcription |
| - PySceneDetect for shot detection |
| |
| Usage: |
| python agent.py --video raw_footage.mp4 --platform tiktok --niche food |
| python agent.py --plan --niche "coffee shop" --platform tiktok |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| from pathlib import Path |
|
|
| |
| |
| |
|
|
| class FFmpegTool: |
| """Execute FFmpeg commands for video/audio processing.""" |
| |
| @staticmethod |
| def run(command: str, description: str = "") -> str: |
| """Execute an FFmpeg command and return result.""" |
| print(f" 🎬 FFmpeg: {description}") |
| print(f" $ {command}") |
| try: |
| result = subprocess.run( |
| command, shell=True, capture_output=True, text=True, timeout=120 |
| ) |
| if result.returncode == 0: |
| return json.dumps({"status": "success", "message": f"Command completed: {description}"}) |
| else: |
| return json.dumps({"status": "error", "message": result.stderr[:500]}) |
| except subprocess.TimeoutExpired: |
| return json.dumps({"status": "error", "message": "Command timed out after 120s"}) |
| except Exception as e: |
| return json.dumps({"status": "error", "message": str(e)}) |
|
|
|
|
| class WebSearchTool: |
| """Search the web using DuckDuckGo (free, no API key needed).""" |
| |
| @staticmethod |
| def search(query: str, search_type: str = "general") -> str: |
| """Search the web and return results.""" |
| print(f" 🔍 Searching: {query} (type: {search_type})") |
| try: |
| from duckduckgo_search import DDGS |
| with DDGS() as ddgs: |
| results = [] |
| for r in ddgs.text(query, max_results=5): |
| results.append({ |
| "title": r.get("title", ""), |
| "url": r.get("href", ""), |
| "description": r.get("body", "")[:200] |
| }) |
| return json.dumps({"results": results}) |
| except ImportError: |
| return json.dumps({"results": [{"title": "Install duckduckgo-search", "description": "pip install duckduckgo-search"}]}) |
| except Exception as e: |
| return json.dumps({"results": [], "error": str(e)}) |
|
|
|
|
| class VideoAnalyzer: |
| """Analyze video files using ffprobe and PySceneDetect.""" |
| |
| @staticmethod |
| def analyze(video_path: str, analysis_type: str = "full") -> str: |
| """Analyze a video file.""" |
| print(f" 📊 Analyzing: {video_path} ({analysis_type})") |
| |
| if not os.path.exists(video_path): |
| return json.dumps({"error": f"File not found: {video_path}"}) |
| |
| result = {} |
| |
| |
| try: |
| probe = subprocess.run( |
| f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"', |
| shell=True, capture_output=True, text=True |
| ) |
| if probe.returncode == 0: |
| info = json.loads(probe.stdout) |
| fmt = info.get("format", {}) |
| result["duration"] = float(fmt.get("duration", 0)) |
| result["size_mb"] = round(int(fmt.get("size", 0)) / 1024 / 1024, 1) |
| |
| for stream in info.get("streams", []): |
| if stream.get("codec_type") == "video": |
| result["resolution"] = f"{stream.get('width')}x{stream.get('height')}" |
| result["fps"] = eval(stream.get("r_frame_rate", "30/1")) |
| result["codec"] = stream.get("codec_name") |
| elif stream.get("codec_type") == "audio": |
| result["audio_codec"] = stream.get("codec_name") |
| result["audio_channels"] = stream.get("channels") |
| except Exception as e: |
| result["probe_error"] = str(e) |
| |
| |
| if analysis_type in ("full", "scenes"): |
| try: |
| from scenedetect import open_video, SceneManager |
| from scenedetect.detectors import ContentDetector |
| |
| video = open_video(video_path) |
| scene_manager = SceneManager() |
| scene_manager.add_detector(ContentDetector(threshold=27)) |
| scene_manager.detect_scenes(video) |
| scene_list = scene_manager.get_scene_list() |
| |
| result["scenes"] = [] |
| for i, (start, end) in enumerate(scene_list): |
| result["scenes"].append({ |
| "scene": i + 1, |
| "start": round(start.get_seconds(), 2), |
| "end": round(end.get_seconds(), 2), |
| "duration": round((end - start).get_seconds(), 2) |
| }) |
| except ImportError: |
| result["scenes_note"] = "Install scenedetect: pip install scenedetect[opencv]" |
| except Exception as e: |
| result["scenes_error"] = str(e) |
| |
| |
| if analysis_type in ("full", "transcript", "audio"): |
| try: |
| import whisper |
| model = whisper.load_model("base") |
| transcript = model.transcribe(video_path) |
| result["transcript"] = transcript.get("text", "")[:2000] |
| result["segments"] = [ |
| {"start": s["start"], "end": s["end"], "text": s["text"]} |
| for s in transcript.get("segments", [])[:50] |
| ] |
| except ImportError: |
| result["transcript_note"] = "Install whisper: pip install openai-whisper" |
| except Exception as e: |
| result["transcript_error"] = str(e) |
| |
| return json.dumps(result) |
|
|
|
|
| class ViralityScorer: |
| """Score video content for viral potential.""" |
| |
| @staticmethod |
| def score(video_path: str, platform: str, niche: str = "") -> str: |
| """Score a video's viral potential based on heuristics.""" |
| print(f" 📈 Scoring virality: {video_path} for {platform}") |
| |
| |
| try: |
| probe = subprocess.run( |
| f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"', |
| shell=True, capture_output=True, text=True |
| ) |
| info = json.loads(probe.stdout) if probe.returncode == 0 else {} |
| except: |
| info = {} |
| |
| duration = float(info.get("format", {}).get("duration", 0)) |
| has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", [])) |
| |
| |
| optimal_ranges = { |
| "tiktok": (7, 30), |
| "instagram_reels": (15, 30), |
| "youtube_shorts": (30, 60) |
| } |
| opt_min, opt_max = optimal_ranges.get(platform, (15, 60)) |
| |
| |
| scores = {} |
| |
| |
| if opt_min <= duration <= opt_max: |
| scores["length_optimal"] = 90 |
| elif duration < opt_min: |
| scores["length_optimal"] = max(50, 90 - (opt_min - duration) * 5) |
| else: |
| scores["length_optimal"] = max(40, 90 - (duration - opt_max) * 3) |
| |
| |
| scores["audio_match"] = 80 if has_audio else 30 |
| |
| |
| for s in info.get("streams", []): |
| if s.get("codec_type") == "video": |
| h = int(s.get("height", 0)) |
| w = int(s.get("width", 0)) |
| if h >= 1920 or w >= 1080: |
| scores["visual_quality"] = 85 |
| elif h >= 1080: |
| scores["visual_quality"] = 75 |
| else: |
| scores["visual_quality"] = 55 |
| |
| if h > w: |
| scores["format_match"] = 90 |
| else: |
| scores["format_match"] = 50 |
| |
| scores.setdefault("visual_quality", 60) |
| scores.setdefault("format_match", 60) |
| scores["hook_strength"] = 70 |
| scores["pacing"] = 70 |
| scores["trend_alignment"] = 65 |
| |
| overall = round(sum(scores.values()) / len(scores)) |
| |
| suggestions = [] |
| if scores.get("format_match", 0) < 70: |
| suggestions.append("Convert to 9:16 vertical format for better reach") |
| if scores.get("length_optimal", 0) < 70: |
| suggestions.append(f"Adjust length to {opt_min}-{opt_max}s for {platform}") |
| if not has_audio: |
| suggestions.append("Add audio - videos without sound get 40% less reach") |
| |
| return json.dumps({ |
| "overall_score": overall, |
| "breakdown": scores, |
| "suggestions": suggestions |
| }) |
|
|
|
|
| class CaptionGenerator: |
| """Generate platform-optimized captions.""" |
| |
| @staticmethod |
| def generate(video_description: str, platform: str, tone: str = "casual", include_cta: bool = True) -> str: |
| """Generate a caption (using the model itself for this in production).""" |
| print(f" ✍️ Generating caption for {platform}") |
| |
| hashtag_sets = { |
| "tiktok": ["#fyp", "#viral", "#foryou", "#trending"], |
| "instagram": ["#reels", "#explore", "#instagood", "#trending"], |
| "youtube": ["#shorts", "#subscribe", "#viral"] |
| } |
| |
| base_tags = hashtag_sets.get(platform, ["#viral"]) |
| |
| |
| words = video_description.lower().split() |
| niche_tags = [f"#{w}" for w in words if len(w) > 3 and w.isalpha()][:3] |
| |
| posting_times = { |
| "tiktok": "7-9am, 12-1pm, or 7-9pm in your audience timezone", |
| "instagram": "6-9am, 12-2pm, or 5-7pm EST", |
| "youtube": "2-4pm or 8-10pm EST" |
| } |
| |
| return json.dumps({ |
| "caption": f"[AI will generate based on: {video_description}]", |
| "hashtags": " ".join(base_tags + niche_tags), |
| "posting_time": posting_times.get(platform, "Check your analytics"), |
| "tip": "Reply to every comment in the first hour - algorithm loves engagement" |
| }) |
|
|
|
|
| class AIDetector: |
| """Detect AI-generated content.""" |
| |
| @staticmethod |
| def detect(content_path: str, check_type: str = "video") -> str: |
| """Basic AI content detection heuristics.""" |
| print(f" 🔬 Checking for AI artifacts: {content_path}") |
| |
| if not os.path.exists(content_path): |
| return json.dumps({"error": f"File not found: {content_path}"}) |
| |
| |
| size = os.path.getsize(content_path) |
| |
| return json.dumps({ |
| "file_analyzed": content_path, |
| "check_type": check_type, |
| "file_size_mb": round(size / 1024 / 1024, 2), |
| "note": "Full AI detection requires DeMamba or VideoScore2 model. Basic file analysis only.", |
| "recommendations": [ |
| "Check for morphing objects between frames", |
| "Look for impossible reflections or shadows", |
| "Verify text is readable and consistent", |
| "Check if camera movement is unnaturally smooth" |
| ] |
| }) |
|
|
|
|
| |
| |
| |
|
|
| TOOL_MAP = { |
| "ffmpeg_cmd": lambda args: FFmpegTool.run(**args), |
| "web_search": lambda args: WebSearchTool.search(**args), |
| "analyze_video": lambda args: VideoAnalyzer.analyze(**args), |
| "score_virality": lambda args: ViralityScorer.score(**args), |
| "generate_caption": lambda args: CaptionGenerator.generate(**args), |
| "detect_ai_slop": lambda args: AIDetector.detect(**args), |
| } |
|
|
|
|
| class ViralCutAgent: |
| """The main agent that orchestrates video editing using the fine-tuned model.""" |
| |
| def __init__(self, model_id="ryu34/viralcut-agent", device="auto"): |
| print(f"Loading ViralCut Agent from {model_id}...") |
| |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(model_id) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| device_map=device, |
| torch_dtype="auto", |
| ) |
| self.model.eval() |
| |
| |
| self.tools = [ |
| {"type": "function", "function": {"name": "ffmpeg_cmd", "description": "Execute FFmpeg command for video/audio processing.", "parameters": {"type": "object", "properties": {"command": {"type": "string"}, "description": {"type": "string"}}, "required": ["command", "description"]}}}, |
| {"type": "function", "function": {"name": "web_search", "description": "Search web for royalty-free assets and trends.", "parameters": {"type": "object", "properties": {"query": {"type": "string"}, "search_type": {"type": "string", "enum": ["royalty_free_music", "sound_effects", "trending_content", "general"]}}, "required": ["query", "search_type"]}}}, |
| {"type": "function", "function": {"name": "analyze_video", "description": "Analyze video for scenes, audio, transcript, quality.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "analysis_type": {"type": "string", "enum": ["full", "scenes", "audio", "transcript", "quality", "pacing"]}}, "required": ["video_path", "analysis_type"]}}}, |
| {"type": "function", "function": {"name": "score_virality", "description": "Score video viral potential 0-100.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram_reels", "youtube_shorts"]}, "niche": {"type": "string"}}, "required": ["video_path", "platform"]}}}, |
| {"type": "function", "function": {"name": "generate_caption", "description": "Generate platform-optimized caption with hashtags.", "parameters": {"type": "object", "properties": {"video_description": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram", "youtube"]}, "tone": {"type": "string"}, "include_cta": {"type": "boolean"}}, "required": ["video_description", "platform"]}}}, |
| {"type": "function", "function": {"name": "detect_ai_slop", "description": "Check content for AI-generated artifacts.", "parameters": {"type": "object", "properties": {"content_path": {"type": "string"}, "check_type": {"type": "string", "enum": ["video", "image", "text", "audio"]}}, "required": ["content_path", "check_type"]}}} |
| ] |
| |
| print("Agent ready!") |
| |
| def run(self, user_message: str, max_turns: int = 15): |
| """Run the agent on a user request, executing tool calls autonomously.""" |
| |
| messages = [ |
| {"role": "system", "content": "You are ViralCut Agent, an autonomous AI video editor and social media content strategist. You transform raw video footage into professional, viral-worthy social media content. Use your tools to analyze, edit, search, and optimize. Think step-by-step. Always use royalty-free content."}, |
| {"role": "user", "content": user_message} |
| ] |
| |
| print(f"\n{'='*60}") |
| print(f"🎬 ViralCut Agent") |
| print(f"{'='*60}") |
| print(f"User: {user_message}\n") |
| |
| for turn in range(max_turns): |
| |
| text = self.tokenizer.apply_chat_template( |
| messages, tools=self.tools, tokenize=False, add_generation_prompt=True |
| ) |
| inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device) |
| |
| with __import__("torch").no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=1024, |
| temperature=0.7, |
| top_p=0.9, |
| do_sample=True, |
| ) |
| |
| response = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=False) |
| |
| |
| tool_calls = self._parse_tool_calls(response) |
| |
| if tool_calls: |
| |
| messages.append({"role": "assistant", "tool_calls": tool_calls}) |
| |
| |
| for tc in tool_calls: |
| func_name = tc["function"]["name"] |
| try: |
| args = json.loads(tc["function"]["arguments"]) |
| except: |
| args = {} |
| |
| print(f"\n 🔧 Calling: {func_name}") |
| |
| if func_name in TOOL_MAP: |
| result = TOOL_MAP[func_name](args) |
| else: |
| result = json.dumps({"error": f"Unknown tool: {func_name}"}) |
| |
| messages.append({"role": "tool", "name": func_name, "content": result}) |
| print(f" ✅ Result: {result[:200]}...") |
| else: |
| |
| clean = self._clean_response(response) |
| messages.append({"role": "assistant", "content": clean}) |
| print(f"\n🤖 Agent: {clean}") |
| break |
| |
| return messages |
| |
| def _parse_tool_calls(self, response: str) -> list: |
| """Parse tool calls from model output.""" |
| tool_calls = [] |
| |
| |
| pattern = r'<tool_call>\s*(\{.*?\})\s*</tool_call>' |
| matches = re.findall(pattern, response, re.DOTALL) |
| |
| for match in matches: |
| try: |
| data = json.loads(match) |
| tool_calls.append({ |
| "type": "function", |
| "function": { |
| "name": data.get("name", ""), |
| "arguments": json.dumps(data.get("arguments", {})) |
| } |
| }) |
| except json.JSONDecodeError: |
| continue |
| |
| return tool_calls |
| |
| def _clean_response(self, response: str) -> str: |
| """Clean up model response.""" |
| |
| for token in ["<|endoftext|>", "<|im_end|>", "<|im_start|>"]: |
| response = response.replace(token, "") |
| return response.strip() |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="ViralCut Agent - AI Video Editor") |
| parser.add_argument("--video", type=str, help="Path to raw video file") |
| parser.add_argument("--platform", type=str, default="tiktok", |
| choices=["tiktok", "instagram", "youtube"], |
| help="Target platform") |
| parser.add_argument("--niche", type=str, default="", help="Content niche") |
| parser.add_argument("--plan", action="store_true", help="Generate content plan only (no video needed)") |
| parser.add_argument("--model", type=str, default="ryu34/viralcut-agent", help="Model ID") |
| parser.add_argument("--check-slop", type=str, nargs="+", help="Check files for AI-generated content") |
| |
| args = parser.parse_args() |
| |
| if args.check_slop: |
| |
| for f in args.check_slop: |
| result = AIDetector.detect(f, "video") |
| print(json.dumps(json.loads(result), indent=2)) |
| return |
| |
| agent = ViralCutAgent(model_id=args.model) |
| |
| if args.plan: |
| niche = args.niche or "general" |
| agent.run(f"Research current {args.platform} trends for the '{niche}' niche and create a detailed 7-day content plan with hooks, posting times, and viral strategies.") |
| elif args.video: |
| if not os.path.exists(args.video): |
| print(f"Error: Video file not found: {args.video}") |
| sys.exit(1) |
| niche_str = f" in the {args.niche} niche" if args.niche else "" |
| agent.run(f"I have raw footage at {args.video}. Transform it into a professional, viral {args.platform} video{niche_str}. Analyze it, find the best moments, add trending music, professional edits, and optimize for maximum engagement.") |
| else: |
| |
| print("ViralCut Agent - Interactive Mode") |
| print("Type your request (or 'quit' to exit):\n") |
| while True: |
| try: |
| user_input = input("You: ").strip() |
| if user_input.lower() in ("quit", "exit", "q"): |
| break |
| if user_input: |
| agent.run(user_input) |
| except (KeyboardInterrupt, EOFError): |
| break |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|