verivid / app /services /pipeline.py
bahaeddinmselmi
fix(analyzer): robust FFmpeg streaming with user-agent and reconnection
4b16a94
# C:\Users\bahae\.gemini\antigravity\scratch\verivid-ai\hf_space\app\services\pipeline.py
"""
Professional Analysis Pipeline with Zero-Storage Streaming
==========================================================
Integration of Visual, Audio, and Content engines.
"""
import os
import json
import hashlib
import shutil
from datetime import datetime
from app.services.downloader import (
get_video_info,
clean_temp,
stream_extract_frames,
stream_extract_audio,
extract_frames,
extract_audio,
is_youtube_url,
download_video,
download_youtube_thumbnail
)
from app.services.local_signals import analyze_metadata, analyze_heuristics, analyze_content
from app.services.hf_inference import analyze_visual_fallback, analyze_audio_ai
from app.services.sightengine import analyze_frames_with_sightengine
from app.core.scoring import calculate_risk
# Cache
CACHE_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'cache')
def get_cache_key(url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def get_cached_result(url: str):
os.makedirs(CACHE_DIR, exist_ok=True)
cache_file = os.path.join(CACHE_DIR, f"{get_cache_key(url)}.json")
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
data = json.load(f)
cached_time = datetime.fromisoformat(data.get('cached_at', '2000-01-01'))
if (datetime.now() - cached_time).total_seconds() < 86400:
return data.get('result')
except:
pass
return None
def save_to_cache(url: str, result: dict):
os.makedirs(CACHE_DIR, exist_ok=True)
cache_file = os.path.join(CACHE_DIR, f"{get_cache_key(url)}.json")
try:
with open(cache_file, 'w') as f:
json.dump({'cached_at': datetime.now().isoformat(), 'url': url, 'result': result}, f)
except:
pass
async def run_analysis_pipeline(job_id: str, url: str, uploaded_file_path: str, jobs_db: dict):
"""
Main analysis pipeline with ZERO-STORAGE streaming for URL analysis.
"""
print(f"[{job_id}] Starting analysis for URL: {url}")
jobs_db[job_id]["status"] = "processing"
try:
# Check cache
if url:
cached = get_cached_result(url)
if cached:
cached['id'] = job_id
jobs_db[job_id] = {"status": "completed", "result": cached}
return
# Get video info
video_info = None
if url:
video_info = get_video_info(url)
if not video_info:
video_info = {"thumbnail": None, "title": "Unknown"}
frame_paths = []
audio_path = None
video_path = None
thumbnail_only = False
# PATH A: URL
if url and not uploaded_file_path:
print(f"[{job_id}] Attempting stream extraction...")
frame_paths = stream_extract_frames(url, job_id, max_frames=8, duration=30)
if not frame_paths:
print(f"[{job_id}] Stream extraction failed, attempting full download...")
video_path = download_video(url, job_id)
if video_path and os.path.exists(video_path):
print(f"[{job_id}] Download successful, extracting frames from file...")
frame_paths = extract_frames(video_path, job_id, fps=0.5, max_frames=8)
audio_path = extract_audio(video_path, job_id)
elif is_youtube_url(url):
print(f"[{job_id}] YouTube video blocked, attempting thumbnail fallback...")
frame_paths = download_youtube_thumbnail(url, job_id)
thumbnail_only = True
else:
msg = "Could not download video or extract frames (All layers failed)"
print(f"[{job_id}] ERROR: {msg}")
jobs_db[job_id] = {"status": "failed", "error": msg}
return
else:
print(f"[{job_id}] Stream extraction successful, extracting audio stream...")
audio_path = stream_extract_audio(url, job_id, duration=30)
# PATH B: Upload
elif uploaded_file_path and os.path.exists(uploaded_file_path):
video_path = uploaded_file_path
frame_paths = extract_frames(video_path, job_id, fps=0.5, max_frames=8)
audio_path = extract_audio(video_path, job_id)
# ANALYSIS
# Visual - Layer 1: SightEngine (Primary)
if not frame_paths:
print(f"[{job_id}] ERROR: No frames extracted for analysis.")
visual = {
"avg_prob": 0,
"max_prob": 0,
"frame_count": 0,
"details": "No frames available for analysis (download or extraction failed)",
"source": "None"
}
else:
print(f"[{job_id}] Layer 1: SightEngine visual analysis")
se_result = analyze_frames_with_sightengine(frame_paths)
visual = {
"avg_prob": 0,
"max_prob": 0,
"frame_count": 0,
"details": "",
"source": "SightEngine"
}
if se_result.get("frame_count", 0) > 0:
visual["avg_prob"] = se_result["avg_score"]
visual["max_prob"] = se_result["max_score"]
visual["frame_count"] = se_result["frame_count"]
visual["details"] = se_result["details"]
else:
# Layer 2: HuggingFace Fallback
print(f"[{job_id}] Layer 2: Falling back to HuggingFace visual engine")
visual_result = analyze_visual_fallback(frame_paths)
visual["avg_prob"] = visual_result["avg_prob"]
visual["max_prob"] = visual_result["max_prob"]
visual["frame_count"] = visual_result["frame_count"]
visual["details"] = f"Fallback: {visual_result['details']}"
visual["source"] = "HuggingFace Fallback"
# Audio
audio = analyze_audio_ai(video_path, audio_path=audio_path)
# Metadata & Heuristics
meta = analyze_metadata(video_path, video_info=video_info)
heuristics = analyze_heuristics(video_path, meta, video_info=video_info)
# Content Analysis (New)
content = analyze_content(video_info=video_info)
# Scoring
signals = {
"visual": visual,
"audio": audio,
"metadata": meta,
"heuristics": heuristics,
"content": content
}
score, confidence, rec = calculate_risk(signals)
# Build explanation
if thumbnail_only:
explanation = f"⚠️ Thumbnail-only analysis. Risk score: {score}/100 ({rec}). {confidence} confidence."
else:
explanation = f"Extensive analysis of {len(frame_paths)} frames and audio signals. Risk score: {score}/100 ({rec}). {confidence} confidence."
result = {
"score": score,
"confidence": confidence,
"recommendation": rec,
"signals": signals,
"thumbnail_only": thumbnail_only,
"video_info": {
"title": video_info.get("title", "Unknown"),
"duration": video_info.get("duration"),
"resolution": f"{video_info.get('width', '?')}x{video_info.get('height', '?')}"
},
"explanation": explanation,
"disclaimer": "AI detection is probabilistic."
}
if url:
save_to_cache(url, result)
clean_temp(job_id)
result['id'] = job_id
jobs_db[job_id] = {"status": "completed", "result": result}
except Exception as e:
print(f"Pipeline failure: {e}")
jobs_db[job_id] = {"status": "failed", "error": str(e)}
clean_temp(job_id)