""" ViralCut Agent - Runtime ======================== The actual agent that uses the fine-tuned model to edit videos autonomously. This connects the trained model to real tools: - FFmpeg for video editing - DuckDuckGo for web search (free, no API key) - Whisper for transcription - PySceneDetect for shot detection Usage: python agent.py --video raw_footage.mp4 --platform tiktok --niche food python agent.py --plan --niche "coffee shop" --platform tiktok """ import argparse import json import os import re import subprocess import sys import tempfile from pathlib import Path # ============================================================ # TOOL IMPLEMENTATIONS # ============================================================ class FFmpegTool: """Execute FFmpeg commands for video/audio processing.""" @staticmethod def run(command: str, description: str = "") -> str: """Execute an FFmpeg command and return result.""" print(f" šŸŽ¬ FFmpeg: {description}") print(f" $ {command}") try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=120 ) if result.returncode == 0: return json.dumps({"status": "success", "message": f"Command completed: {description}"}) else: return json.dumps({"status": "error", "message": result.stderr[:500]}) except subprocess.TimeoutExpired: return json.dumps({"status": "error", "message": "Command timed out after 120s"}) except Exception as e: return json.dumps({"status": "error", "message": str(e)}) class WebSearchTool: """Search the web using DuckDuckGo (free, no API key needed).""" @staticmethod def search(query: str, search_type: str = "general") -> str: """Search the web and return results.""" print(f" šŸ” Searching: {query} (type: {search_type})") try: from duckduckgo_search import DDGS with DDGS() as ddgs: results = [] for r in ddgs.text(query, max_results=5): results.append({ "title": r.get("title", ""), "url": r.get("href", ""), "description": r.get("body", "")[:200] }) return json.dumps({"results": results}) except ImportError: return json.dumps({"results": [{"title": "Install duckduckgo-search", "description": "pip install duckduckgo-search"}]}) except Exception as e: return json.dumps({"results": [], "error": str(e)}) class VideoAnalyzer: """Analyze video files using ffprobe and PySceneDetect.""" @staticmethod def analyze(video_path: str, analysis_type: str = "full") -> str: """Analyze a video file.""" print(f" šŸ“Š Analyzing: {video_path} ({analysis_type})") if not os.path.exists(video_path): return json.dumps({"error": f"File not found: {video_path}"}) result = {} # Get basic info via ffprobe try: probe = subprocess.run( f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"', shell=True, capture_output=True, text=True ) if probe.returncode == 0: info = json.loads(probe.stdout) fmt = info.get("format", {}) result["duration"] = float(fmt.get("duration", 0)) result["size_mb"] = round(int(fmt.get("size", 0)) / 1024 / 1024, 1) for stream in info.get("streams", []): if stream.get("codec_type") == "video": result["resolution"] = f"{stream.get('width')}x{stream.get('height')}" result["fps"] = eval(stream.get("r_frame_rate", "30/1")) result["codec"] = stream.get("codec_name") elif stream.get("codec_type") == "audio": result["audio_codec"] = stream.get("codec_name") result["audio_channels"] = stream.get("channels") except Exception as e: result["probe_error"] = str(e) # Scene detection if analysis_type in ("full", "scenes"): try: from scenedetect import open_video, SceneManager from scenedetect.detectors import ContentDetector video = open_video(video_path) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=27)) scene_manager.detect_scenes(video) scene_list = scene_manager.get_scene_list() result["scenes"] = [] for i, (start, end) in enumerate(scene_list): result["scenes"].append({ "scene": i + 1, "start": round(start.get_seconds(), 2), "end": round(end.get_seconds(), 2), "duration": round((end - start).get_seconds(), 2) }) except ImportError: result["scenes_note"] = "Install scenedetect: pip install scenedetect[opencv]" except Exception as e: result["scenes_error"] = str(e) # Transcript via Whisper if analysis_type in ("full", "transcript", "audio"): try: import whisper model = whisper.load_model("base") transcript = model.transcribe(video_path) result["transcript"] = transcript.get("text", "")[:2000] result["segments"] = [ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in transcript.get("segments", [])[:50] ] except ImportError: result["transcript_note"] = "Install whisper: pip install openai-whisper" except Exception as e: result["transcript_error"] = str(e) return json.dumps(result) class ViralityScorer: """Score video content for viral potential.""" @staticmethod def score(video_path: str, platform: str, niche: str = "") -> str: """Score a video's viral potential based on heuristics.""" print(f" šŸ“ˆ Scoring virality: {video_path} for {platform}") # Get video info try: probe = subprocess.run( f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"', shell=True, capture_output=True, text=True ) info = json.loads(probe.stdout) if probe.returncode == 0 else {} except: info = {} duration = float(info.get("format", {}).get("duration", 0)) has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", [])) # Platform-specific optimal durations optimal_ranges = { "tiktok": (7, 30), "instagram_reels": (15, 30), "youtube_shorts": (30, 60) } opt_min, opt_max = optimal_ranges.get(platform, (15, 60)) # Score components scores = {} # Length score if opt_min <= duration <= opt_max: scores["length_optimal"] = 90 elif duration < opt_min: scores["length_optimal"] = max(50, 90 - (opt_min - duration) * 5) else: scores["length_optimal"] = max(40, 90 - (duration - opt_max) * 3) # Audio presence scores["audio_match"] = 80 if has_audio else 30 # Resolution check for s in info.get("streams", []): if s.get("codec_type") == "video": h = int(s.get("height", 0)) w = int(s.get("width", 0)) if h >= 1920 or w >= 1080: scores["visual_quality"] = 85 elif h >= 1080: scores["visual_quality"] = 75 else: scores["visual_quality"] = 55 # Vertical check if h > w: scores["format_match"] = 90 else: scores["format_match"] = 50 scores.setdefault("visual_quality", 60) scores.setdefault("format_match", 60) scores["hook_strength"] = 70 # Can't assess without content analysis scores["pacing"] = 70 scores["trend_alignment"] = 65 overall = round(sum(scores.values()) / len(scores)) suggestions = [] if scores.get("format_match", 0) < 70: suggestions.append("Convert to 9:16 vertical format for better reach") if scores.get("length_optimal", 0) < 70: suggestions.append(f"Adjust length to {opt_min}-{opt_max}s for {platform}") if not has_audio: suggestions.append("Add audio - videos without sound get 40% less reach") return json.dumps({ "overall_score": overall, "breakdown": scores, "suggestions": suggestions }) class CaptionGenerator: """Generate platform-optimized captions.""" @staticmethod def generate(video_description: str, platform: str, tone: str = "casual", include_cta: bool = True) -> str: """Generate a caption (using the model itself for this in production).""" print(f" āœļø Generating caption for {platform}") hashtag_sets = { "tiktok": ["#fyp", "#viral", "#foryou", "#trending"], "instagram": ["#reels", "#explore", "#instagood", "#trending"], "youtube": ["#shorts", "#subscribe", "#viral"] } base_tags = hashtag_sets.get(platform, ["#viral"]) # Extract keywords from description for niche hashtags words = video_description.lower().split() niche_tags = [f"#{w}" for w in words if len(w) > 3 and w.isalpha()][:3] posting_times = { "tiktok": "7-9am, 12-1pm, or 7-9pm in your audience timezone", "instagram": "6-9am, 12-2pm, or 5-7pm EST", "youtube": "2-4pm or 8-10pm EST" } return json.dumps({ "caption": f"[AI will generate based on: {video_description}]", "hashtags": " ".join(base_tags + niche_tags), "posting_time": posting_times.get(platform, "Check your analytics"), "tip": "Reply to every comment in the first hour - algorithm loves engagement" }) class AIDetector: """Detect AI-generated content.""" @staticmethod def detect(content_path: str, check_type: str = "video") -> str: """Basic AI content detection heuristics.""" print(f" šŸ”¬ Checking for AI artifacts: {content_path}") if not os.path.exists(content_path): return json.dumps({"error": f"File not found: {content_path}"}) # Basic file analysis (real detection would use a classifier model) size = os.path.getsize(content_path) return json.dumps({ "file_analyzed": content_path, "check_type": check_type, "file_size_mb": round(size / 1024 / 1024, 2), "note": "Full AI detection requires DeMamba or VideoScore2 model. Basic file analysis only.", "recommendations": [ "Check for morphing objects between frames", "Look for impossible reflections or shadows", "Verify text is readable and consistent", "Check if camera movement is unnaturally smooth" ] }) # ============================================================ # AGENT CORE # ============================================================ TOOL_MAP = { "ffmpeg_cmd": lambda args: FFmpegTool.run(**args), "web_search": lambda args: WebSearchTool.search(**args), "analyze_video": lambda args: VideoAnalyzer.analyze(**args), "score_virality": lambda args: ViralityScorer.score(**args), "generate_caption": lambda args: CaptionGenerator.generate(**args), "detect_ai_slop": lambda args: AIDetector.detect(**args), } class ViralCutAgent: """The main agent that orchestrates video editing using the fine-tuned model.""" def __init__(self, model_id="ryu34/viralcut-agent", device="auto"): print(f"Loading ViralCut Agent from {model_id}...") from transformers import AutoModelForCausalLM, AutoTokenizer self.tokenizer = AutoTokenizer.from_pretrained(model_id) self.model = AutoModelForCausalLM.from_pretrained( model_id, device_map=device, torch_dtype="auto", ) self.model.eval() # Tool definitions for the chat template self.tools = [ {"type": "function", "function": {"name": "ffmpeg_cmd", "description": "Execute FFmpeg command for video/audio processing.", "parameters": {"type": "object", "properties": {"command": {"type": "string"}, "description": {"type": "string"}}, "required": ["command", "description"]}}}, {"type": "function", "function": {"name": "web_search", "description": "Search web for royalty-free assets and trends.", "parameters": {"type": "object", "properties": {"query": {"type": "string"}, "search_type": {"type": "string", "enum": ["royalty_free_music", "sound_effects", "trending_content", "general"]}}, "required": ["query", "search_type"]}}}, {"type": "function", "function": {"name": "analyze_video", "description": "Analyze video for scenes, audio, transcript, quality.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "analysis_type": {"type": "string", "enum": ["full", "scenes", "audio", "transcript", "quality", "pacing"]}}, "required": ["video_path", "analysis_type"]}}}, {"type": "function", "function": {"name": "score_virality", "description": "Score video viral potential 0-100.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram_reels", "youtube_shorts"]}, "niche": {"type": "string"}}, "required": ["video_path", "platform"]}}}, {"type": "function", "function": {"name": "generate_caption", "description": "Generate platform-optimized caption with hashtags.", "parameters": {"type": "object", "properties": {"video_description": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram", "youtube"]}, "tone": {"type": "string"}, "include_cta": {"type": "boolean"}}, "required": ["video_description", "platform"]}}}, {"type": "function", "function": {"name": "detect_ai_slop", "description": "Check content for AI-generated artifacts.", "parameters": {"type": "object", "properties": {"content_path": {"type": "string"}, "check_type": {"type": "string", "enum": ["video", "image", "text", "audio"]}}, "required": ["content_path", "check_type"]}}} ] print("Agent ready!") def run(self, user_message: str, max_turns: int = 15): """Run the agent on a user request, executing tool calls autonomously.""" messages = [ {"role": "system", "content": "You are ViralCut Agent, an autonomous AI video editor and social media content strategist. You transform raw video footage into professional, viral-worthy social media content. Use your tools to analyze, edit, search, and optimize. Think step-by-step. Always use royalty-free content."}, {"role": "user", "content": user_message} ] print(f"\n{'='*60}") print(f"šŸŽ¬ ViralCut Agent") print(f"{'='*60}") print(f"User: {user_message}\n") for turn in range(max_turns): # Generate response text = self.tokenizer.apply_chat_template( messages, tools=self.tools, tokenize=False, add_generation_prompt=True ) inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device) with __import__("torch").no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=1024, temperature=0.7, top_p=0.9, do_sample=True, ) response = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=False) # Parse response for tool calls or plain text tool_calls = self._parse_tool_calls(response) if tool_calls: # Add assistant message with tool calls messages.append({"role": "assistant", "tool_calls": tool_calls}) # Execute each tool call for tc in tool_calls: func_name = tc["function"]["name"] try: args = json.loads(tc["function"]["arguments"]) except: args = {} print(f"\n šŸ”§ Calling: {func_name}") if func_name in TOOL_MAP: result = TOOL_MAP[func_name](args) else: result = json.dumps({"error": f"Unknown tool: {func_name}"}) messages.append({"role": "tool", "name": func_name, "content": result}) print(f" āœ… Result: {result[:200]}...") else: # Plain text response - agent is done clean = self._clean_response(response) messages.append({"role": "assistant", "content": clean}) print(f"\nšŸ¤– Agent: {clean}") break return messages def _parse_tool_calls(self, response: str) -> list: """Parse tool calls from model output.""" tool_calls = [] # Qwen tool call format: {"name": "...", "arguments": {...}} pattern = r'\s*(\{.*?\})\s*' matches = re.findall(pattern, response, re.DOTALL) for match in matches: try: data = json.loads(match) tool_calls.append({ "type": "function", "function": { "name": data.get("name", ""), "arguments": json.dumps(data.get("arguments", {})) } }) except json.JSONDecodeError: continue return tool_calls def _clean_response(self, response: str) -> str: """Clean up model response.""" # Remove special tokens for token in ["<|endoftext|>", "<|im_end|>", "<|im_start|>"]: response = response.replace(token, "") return response.strip() # ============================================================ # CLI # ============================================================ def main(): parser = argparse.ArgumentParser(description="ViralCut Agent - AI Video Editor") parser.add_argument("--video", type=str, help="Path to raw video file") parser.add_argument("--platform", type=str, default="tiktok", choices=["tiktok", "instagram", "youtube"], help="Target platform") parser.add_argument("--niche", type=str, default="", help="Content niche") parser.add_argument("--plan", action="store_true", help="Generate content plan only (no video needed)") parser.add_argument("--model", type=str, default="ryu34/viralcut-agent", help="Model ID") parser.add_argument("--check-slop", type=str, nargs="+", help="Check files for AI-generated content") args = parser.parse_args() if args.check_slop: # Quick AI slop check without loading the full model for f in args.check_slop: result = AIDetector.detect(f, "video") print(json.dumps(json.loads(result), indent=2)) return agent = ViralCutAgent(model_id=args.model) if args.plan: niche = args.niche or "general" agent.run(f"Research current {args.platform} trends for the '{niche}' niche and create a detailed 7-day content plan with hooks, posting times, and viral strategies.") elif args.video: if not os.path.exists(args.video): print(f"Error: Video file not found: {args.video}") sys.exit(1) niche_str = f" in the {args.niche} niche" if args.niche else "" agent.run(f"I have raw footage at {args.video}. Transform it into a professional, viral {args.platform} video{niche_str}. Analyze it, find the best moments, add trending music, professional edits, and optimize for maximum engagement.") else: # Interactive mode print("ViralCut Agent - Interactive Mode") print("Type your request (or 'quit' to exit):\n") while True: try: user_input = input("You: ").strip() if user_input.lower() in ("quit", "exit", "q"): break if user_input: agent.run(user_input) except (KeyboardInterrupt, EOFError): break if __name__ == "__main__": main()