"""
ViralCut Agent - Runtime
========================
The actual agent that uses the fine-tuned model to edit videos autonomously.
This connects the trained model to real tools:
- FFmpeg for video editing
- DuckDuckGo for web search (free, no API key)
- Whisper for transcription
- PySceneDetect for shot detection
Usage:
python agent.py --video raw_footage.mp4 --platform tiktok --niche food
python agent.py --plan --niche "coffee shop" --platform tiktok
"""
import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
# ============================================================
# TOOL IMPLEMENTATIONS
# ============================================================
class FFmpegTool:
"""Execute FFmpeg commands for video/audio processing."""
@staticmethod
def run(command: str, description: str = "") -> str:
"""Execute an FFmpeg command and return result."""
print(f" š¬ FFmpeg: {description}")
print(f" $ {command}")
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True, timeout=120
)
if result.returncode == 0:
return json.dumps({"status": "success", "message": f"Command completed: {description}"})
else:
return json.dumps({"status": "error", "message": result.stderr[:500]})
except subprocess.TimeoutExpired:
return json.dumps({"status": "error", "message": "Command timed out after 120s"})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
class WebSearchTool:
"""Search the web using DuckDuckGo (free, no API key needed)."""
@staticmethod
def search(query: str, search_type: str = "general") -> str:
"""Search the web and return results."""
print(f" š Searching: {query} (type: {search_type})")
try:
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = []
for r in ddgs.text(query, max_results=5):
results.append({
"title": r.get("title", ""),
"url": r.get("href", ""),
"description": r.get("body", "")[:200]
})
return json.dumps({"results": results})
except ImportError:
return json.dumps({"results": [{"title": "Install duckduckgo-search", "description": "pip install duckduckgo-search"}]})
except Exception as e:
return json.dumps({"results": [], "error": str(e)})
class VideoAnalyzer:
"""Analyze video files using ffprobe and PySceneDetect."""
@staticmethod
def analyze(video_path: str, analysis_type: str = "full") -> str:
"""Analyze a video file."""
print(f" š Analyzing: {video_path} ({analysis_type})")
if not os.path.exists(video_path):
return json.dumps({"error": f"File not found: {video_path}"})
result = {}
# Get basic info via ffprobe
try:
probe = subprocess.run(
f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"',
shell=True, capture_output=True, text=True
)
if probe.returncode == 0:
info = json.loads(probe.stdout)
fmt = info.get("format", {})
result["duration"] = float(fmt.get("duration", 0))
result["size_mb"] = round(int(fmt.get("size", 0)) / 1024 / 1024, 1)
for stream in info.get("streams", []):
if stream.get("codec_type") == "video":
result["resolution"] = f"{stream.get('width')}x{stream.get('height')}"
result["fps"] = eval(stream.get("r_frame_rate", "30/1"))
result["codec"] = stream.get("codec_name")
elif stream.get("codec_type") == "audio":
result["audio_codec"] = stream.get("codec_name")
result["audio_channels"] = stream.get("channels")
except Exception as e:
result["probe_error"] = str(e)
# Scene detection
if analysis_type in ("full", "scenes"):
try:
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector
video = open_video(video_path)
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=27))
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()
result["scenes"] = []
for i, (start, end) in enumerate(scene_list):
result["scenes"].append({
"scene": i + 1,
"start": round(start.get_seconds(), 2),
"end": round(end.get_seconds(), 2),
"duration": round((end - start).get_seconds(), 2)
})
except ImportError:
result["scenes_note"] = "Install scenedetect: pip install scenedetect[opencv]"
except Exception as e:
result["scenes_error"] = str(e)
# Transcript via Whisper
if analysis_type in ("full", "transcript", "audio"):
try:
import whisper
model = whisper.load_model("base")
transcript = model.transcribe(video_path)
result["transcript"] = transcript.get("text", "")[:2000]
result["segments"] = [
{"start": s["start"], "end": s["end"], "text": s["text"]}
for s in transcript.get("segments", [])[:50]
]
except ImportError:
result["transcript_note"] = "Install whisper: pip install openai-whisper"
except Exception as e:
result["transcript_error"] = str(e)
return json.dumps(result)
class ViralityScorer:
"""Score video content for viral potential."""
@staticmethod
def score(video_path: str, platform: str, niche: str = "") -> str:
"""Score a video's viral potential based on heuristics."""
print(f" š Scoring virality: {video_path} for {platform}")
# Get video info
try:
probe = subprocess.run(
f'ffprobe -v quiet -print_format json -show_format -show_streams "{video_path}"',
shell=True, capture_output=True, text=True
)
info = json.loads(probe.stdout) if probe.returncode == 0 else {}
except:
info = {}
duration = float(info.get("format", {}).get("duration", 0))
has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", []))
# Platform-specific optimal durations
optimal_ranges = {
"tiktok": (7, 30),
"instagram_reels": (15, 30),
"youtube_shorts": (30, 60)
}
opt_min, opt_max = optimal_ranges.get(platform, (15, 60))
# Score components
scores = {}
# Length score
if opt_min <= duration <= opt_max:
scores["length_optimal"] = 90
elif duration < opt_min:
scores["length_optimal"] = max(50, 90 - (opt_min - duration) * 5)
else:
scores["length_optimal"] = max(40, 90 - (duration - opt_max) * 3)
# Audio presence
scores["audio_match"] = 80 if has_audio else 30
# Resolution check
for s in info.get("streams", []):
if s.get("codec_type") == "video":
h = int(s.get("height", 0))
w = int(s.get("width", 0))
if h >= 1920 or w >= 1080:
scores["visual_quality"] = 85
elif h >= 1080:
scores["visual_quality"] = 75
else:
scores["visual_quality"] = 55
# Vertical check
if h > w:
scores["format_match"] = 90
else:
scores["format_match"] = 50
scores.setdefault("visual_quality", 60)
scores.setdefault("format_match", 60)
scores["hook_strength"] = 70 # Can't assess without content analysis
scores["pacing"] = 70
scores["trend_alignment"] = 65
overall = round(sum(scores.values()) / len(scores))
suggestions = []
if scores.get("format_match", 0) < 70:
suggestions.append("Convert to 9:16 vertical format for better reach")
if scores.get("length_optimal", 0) < 70:
suggestions.append(f"Adjust length to {opt_min}-{opt_max}s for {platform}")
if not has_audio:
suggestions.append("Add audio - videos without sound get 40% less reach")
return json.dumps({
"overall_score": overall,
"breakdown": scores,
"suggestions": suggestions
})
class CaptionGenerator:
"""Generate platform-optimized captions."""
@staticmethod
def generate(video_description: str, platform: str, tone: str = "casual", include_cta: bool = True) -> str:
"""Generate a caption (using the model itself for this in production)."""
print(f" āļø Generating caption for {platform}")
hashtag_sets = {
"tiktok": ["#fyp", "#viral", "#foryou", "#trending"],
"instagram": ["#reels", "#explore", "#instagood", "#trending"],
"youtube": ["#shorts", "#subscribe", "#viral"]
}
base_tags = hashtag_sets.get(platform, ["#viral"])
# Extract keywords from description for niche hashtags
words = video_description.lower().split()
niche_tags = [f"#{w}" for w in words if len(w) > 3 and w.isalpha()][:3]
posting_times = {
"tiktok": "7-9am, 12-1pm, or 7-9pm in your audience timezone",
"instagram": "6-9am, 12-2pm, or 5-7pm EST",
"youtube": "2-4pm or 8-10pm EST"
}
return json.dumps({
"caption": f"[AI will generate based on: {video_description}]",
"hashtags": " ".join(base_tags + niche_tags),
"posting_time": posting_times.get(platform, "Check your analytics"),
"tip": "Reply to every comment in the first hour - algorithm loves engagement"
})
class AIDetector:
"""Detect AI-generated content."""
@staticmethod
def detect(content_path: str, check_type: str = "video") -> str:
"""Basic AI content detection heuristics."""
print(f" š¬ Checking for AI artifacts: {content_path}")
if not os.path.exists(content_path):
return json.dumps({"error": f"File not found: {content_path}"})
# Basic file analysis (real detection would use a classifier model)
size = os.path.getsize(content_path)
return json.dumps({
"file_analyzed": content_path,
"check_type": check_type,
"file_size_mb": round(size / 1024 / 1024, 2),
"note": "Full AI detection requires DeMamba or VideoScore2 model. Basic file analysis only.",
"recommendations": [
"Check for morphing objects between frames",
"Look for impossible reflections or shadows",
"Verify text is readable and consistent",
"Check if camera movement is unnaturally smooth"
]
})
# ============================================================
# AGENT CORE
# ============================================================
TOOL_MAP = {
"ffmpeg_cmd": lambda args: FFmpegTool.run(**args),
"web_search": lambda args: WebSearchTool.search(**args),
"analyze_video": lambda args: VideoAnalyzer.analyze(**args),
"score_virality": lambda args: ViralityScorer.score(**args),
"generate_caption": lambda args: CaptionGenerator.generate(**args),
"detect_ai_slop": lambda args: AIDetector.detect(**args),
}
class ViralCutAgent:
"""The main agent that orchestrates video editing using the fine-tuned model."""
def __init__(self, model_id="ryu34/viralcut-agent", device="auto"):
print(f"Loading ViralCut Agent from {model_id}...")
from transformers import AutoModelForCausalLM, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map=device,
torch_dtype="auto",
)
self.model.eval()
# Tool definitions for the chat template
self.tools = [
{"type": "function", "function": {"name": "ffmpeg_cmd", "description": "Execute FFmpeg command for video/audio processing.", "parameters": {"type": "object", "properties": {"command": {"type": "string"}, "description": {"type": "string"}}, "required": ["command", "description"]}}},
{"type": "function", "function": {"name": "web_search", "description": "Search web for royalty-free assets and trends.", "parameters": {"type": "object", "properties": {"query": {"type": "string"}, "search_type": {"type": "string", "enum": ["royalty_free_music", "sound_effects", "trending_content", "general"]}}, "required": ["query", "search_type"]}}},
{"type": "function", "function": {"name": "analyze_video", "description": "Analyze video for scenes, audio, transcript, quality.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "analysis_type": {"type": "string", "enum": ["full", "scenes", "audio", "transcript", "quality", "pacing"]}}, "required": ["video_path", "analysis_type"]}}},
{"type": "function", "function": {"name": "score_virality", "description": "Score video viral potential 0-100.", "parameters": {"type": "object", "properties": {"video_path": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram_reels", "youtube_shorts"]}, "niche": {"type": "string"}}, "required": ["video_path", "platform"]}}},
{"type": "function", "function": {"name": "generate_caption", "description": "Generate platform-optimized caption with hashtags.", "parameters": {"type": "object", "properties": {"video_description": {"type": "string"}, "platform": {"type": "string", "enum": ["tiktok", "instagram", "youtube"]}, "tone": {"type": "string"}, "include_cta": {"type": "boolean"}}, "required": ["video_description", "platform"]}}},
{"type": "function", "function": {"name": "detect_ai_slop", "description": "Check content for AI-generated artifacts.", "parameters": {"type": "object", "properties": {"content_path": {"type": "string"}, "check_type": {"type": "string", "enum": ["video", "image", "text", "audio"]}}, "required": ["content_path", "check_type"]}}}
]
print("Agent ready!")
def run(self, user_message: str, max_turns: int = 15):
"""Run the agent on a user request, executing tool calls autonomously."""
messages = [
{"role": "system", "content": "You are ViralCut Agent, an autonomous AI video editor and social media content strategist. You transform raw video footage into professional, viral-worthy social media content. Use your tools to analyze, edit, search, and optimize. Think step-by-step. Always use royalty-free content."},
{"role": "user", "content": user_message}
]
print(f"\n{'='*60}")
print(f"š¬ ViralCut Agent")
print(f"{'='*60}")
print(f"User: {user_message}\n")
for turn in range(max_turns):
# Generate response
text = self.tokenizer.apply_chat_template(
messages, tools=self.tools, tokenize=False, add_generation_prompt=True
)
inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
with __import__("torch").no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=1024,
temperature=0.7,
top_p=0.9,
do_sample=True,
)
response = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=False)
# Parse response for tool calls or plain text
tool_calls = self._parse_tool_calls(response)
if tool_calls:
# Add assistant message with tool calls
messages.append({"role": "assistant", "tool_calls": tool_calls})
# Execute each tool call
for tc in tool_calls:
func_name = tc["function"]["name"]
try:
args = json.loads(tc["function"]["arguments"])
except:
args = {}
print(f"\n š§ Calling: {func_name}")
if func_name in TOOL_MAP:
result = TOOL_MAP[func_name](args)
else:
result = json.dumps({"error": f"Unknown tool: {func_name}"})
messages.append({"role": "tool", "name": func_name, "content": result})
print(f" ā
Result: {result[:200]}...")
else:
# Plain text response - agent is done
clean = self._clean_response(response)
messages.append({"role": "assistant", "content": clean})
print(f"\nš¤ Agent: {clean}")
break
return messages
def _parse_tool_calls(self, response: str) -> list:
"""Parse tool calls from model output."""
tool_calls = []
# Qwen tool call format: {"name": "...", "arguments": {...}}
pattern = r'\s*(\{.*?\})\s*'
matches = re.findall(pattern, response, re.DOTALL)
for match in matches:
try:
data = json.loads(match)
tool_calls.append({
"type": "function",
"function": {
"name": data.get("name", ""),
"arguments": json.dumps(data.get("arguments", {}))
}
})
except json.JSONDecodeError:
continue
return tool_calls
def _clean_response(self, response: str) -> str:
"""Clean up model response."""
# Remove special tokens
for token in ["<|endoftext|>", "<|im_end|>", "<|im_start|>"]:
response = response.replace(token, "")
return response.strip()
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(description="ViralCut Agent - AI Video Editor")
parser.add_argument("--video", type=str, help="Path to raw video file")
parser.add_argument("--platform", type=str, default="tiktok",
choices=["tiktok", "instagram", "youtube"],
help="Target platform")
parser.add_argument("--niche", type=str, default="", help="Content niche")
parser.add_argument("--plan", action="store_true", help="Generate content plan only (no video needed)")
parser.add_argument("--model", type=str, default="ryu34/viralcut-agent", help="Model ID")
parser.add_argument("--check-slop", type=str, nargs="+", help="Check files for AI-generated content")
args = parser.parse_args()
if args.check_slop:
# Quick AI slop check without loading the full model
for f in args.check_slop:
result = AIDetector.detect(f, "video")
print(json.dumps(json.loads(result), indent=2))
return
agent = ViralCutAgent(model_id=args.model)
if args.plan:
niche = args.niche or "general"
agent.run(f"Research current {args.platform} trends for the '{niche}' niche and create a detailed 7-day content plan with hooks, posting times, and viral strategies.")
elif args.video:
if not os.path.exists(args.video):
print(f"Error: Video file not found: {args.video}")
sys.exit(1)
niche_str = f" in the {args.niche} niche" if args.niche else ""
agent.run(f"I have raw footage at {args.video}. Transform it into a professional, viral {args.platform} video{niche_str}. Analyze it, find the best moments, add trending music, professional edits, and optimize for maximum engagement.")
else:
# Interactive mode
print("ViralCut Agent - Interactive Mode")
print("Type your request (or 'quit' to exit):\n")
while True:
try:
user_input = input("You: ").strip()
if user_input.lower() in ("quit", "exit", "q"):
break
if user_input:
agent.run(user_input)
except (KeyboardInterrupt, EOFError):
break
if __name__ == "__main__":
main()