Spaces:
Sleeping
Sleeping
| # C:\Users\bahae\.gemini\antigravity\scratch\verivid-ai\hf_space\app\services\pipeline.py | |
| """ | |
| Professional Analysis Pipeline with Zero-Storage Streaming | |
| ========================================================== | |
| Integration of Visual, Audio, and Content engines. | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| import shutil | |
| from datetime import datetime | |
| from app.services.downloader import ( | |
| get_video_info, | |
| clean_temp, | |
| stream_extract_frames, | |
| stream_extract_audio, | |
| extract_frames, | |
| extract_audio, | |
| is_youtube_url, | |
| download_video, | |
| download_youtube_thumbnail | |
| ) | |
| from app.services.local_signals import analyze_metadata, analyze_heuristics, analyze_content | |
| from app.services.hf_inference import analyze_visual_fallback, analyze_audio_ai | |
| from app.services.sightengine import analyze_frames_with_sightengine | |
| from app.core.scoring import calculate_risk | |
| # Cache | |
| CACHE_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'cache') | |
| def get_cache_key(url: str) -> str: | |
| return hashlib.md5(url.encode()).hexdigest() | |
| def get_cached_result(url: str): | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| cache_file = os.path.join(CACHE_DIR, f"{get_cache_key(url)}.json") | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| data = json.load(f) | |
| cached_time = datetime.fromisoformat(data.get('cached_at', '2000-01-01')) | |
| if (datetime.now() - cached_time).total_seconds() < 86400: | |
| return data.get('result') | |
| except: | |
| pass | |
| return None | |
| def save_to_cache(url: str, result: dict): | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| cache_file = os.path.join(CACHE_DIR, f"{get_cache_key(url)}.json") | |
| try: | |
| with open(cache_file, 'w') as f: | |
| json.dump({'cached_at': datetime.now().isoformat(), 'url': url, 'result': result}, f) | |
| except: | |
| pass | |
| async def run_analysis_pipeline(job_id: str, url: str, uploaded_file_path: str, jobs_db: dict): | |
| """ | |
| Main analysis pipeline with ZERO-STORAGE streaming for URL analysis. | |
| """ | |
| print(f"[{job_id}] Starting analysis for URL: {url}") | |
| jobs_db[job_id]["status"] = "processing" | |
| try: | |
| # Check cache | |
| if url: | |
| cached = get_cached_result(url) | |
| if cached: | |
| cached['id'] = job_id | |
| jobs_db[job_id] = {"status": "completed", "result": cached} | |
| return | |
| # Get video info | |
| video_info = None | |
| if url: | |
| video_info = get_video_info(url) | |
| if not video_info: | |
| video_info = {"thumbnail": None, "title": "Unknown"} | |
| frame_paths = [] | |
| audio_path = None | |
| video_path = None | |
| thumbnail_only = False | |
| # PATH A: URL | |
| if url and not uploaded_file_path: | |
| print(f"[{job_id}] Attempting stream extraction...") | |
| frame_paths = stream_extract_frames(url, job_id, max_frames=8, duration=30) | |
| if not frame_paths: | |
| print(f"[{job_id}] Stream extraction failed, attempting full download...") | |
| video_path = download_video(url, job_id) | |
| if video_path and os.path.exists(video_path): | |
| print(f"[{job_id}] Download successful, extracting frames from file...") | |
| frame_paths = extract_frames(video_path, job_id, fps=0.5, max_frames=8) | |
| audio_path = extract_audio(video_path, job_id) | |
| elif is_youtube_url(url): | |
| print(f"[{job_id}] YouTube video blocked, attempting thumbnail fallback...") | |
| frame_paths = download_youtube_thumbnail(url, job_id) | |
| thumbnail_only = True | |
| else: | |
| msg = "Could not download video or extract frames (All layers failed)" | |
| print(f"[{job_id}] ERROR: {msg}") | |
| jobs_db[job_id] = {"status": "failed", "error": msg} | |
| return | |
| else: | |
| print(f"[{job_id}] Stream extraction successful, extracting audio stream...") | |
| audio_path = stream_extract_audio(url, job_id, duration=30) | |
| # PATH B: Upload | |
| elif uploaded_file_path and os.path.exists(uploaded_file_path): | |
| video_path = uploaded_file_path | |
| frame_paths = extract_frames(video_path, job_id, fps=0.5, max_frames=8) | |
| audio_path = extract_audio(video_path, job_id) | |
| # ANALYSIS | |
| # Visual - Layer 1: SightEngine (Primary) | |
| if not frame_paths: | |
| print(f"[{job_id}] ERROR: No frames extracted for analysis.") | |
| visual = { | |
| "avg_prob": 0, | |
| "max_prob": 0, | |
| "frame_count": 0, | |
| "details": "No frames available for analysis (download or extraction failed)", | |
| "source": "None" | |
| } | |
| else: | |
| print(f"[{job_id}] Layer 1: SightEngine visual analysis") | |
| se_result = analyze_frames_with_sightengine(frame_paths) | |
| visual = { | |
| "avg_prob": 0, | |
| "max_prob": 0, | |
| "frame_count": 0, | |
| "details": "", | |
| "source": "SightEngine" | |
| } | |
| if se_result.get("frame_count", 0) > 0: | |
| visual["avg_prob"] = se_result["avg_score"] | |
| visual["max_prob"] = se_result["max_score"] | |
| visual["frame_count"] = se_result["frame_count"] | |
| visual["details"] = se_result["details"] | |
| else: | |
| # Layer 2: HuggingFace Fallback | |
| print(f"[{job_id}] Layer 2: Falling back to HuggingFace visual engine") | |
| visual_result = analyze_visual_fallback(frame_paths) | |
| visual["avg_prob"] = visual_result["avg_prob"] | |
| visual["max_prob"] = visual_result["max_prob"] | |
| visual["frame_count"] = visual_result["frame_count"] | |
| visual["details"] = f"Fallback: {visual_result['details']}" | |
| visual["source"] = "HuggingFace Fallback" | |
| # Audio | |
| audio = analyze_audio_ai(video_path, audio_path=audio_path) | |
| # Metadata & Heuristics | |
| meta = analyze_metadata(video_path, video_info=video_info) | |
| heuristics = analyze_heuristics(video_path, meta, video_info=video_info) | |
| # Content Analysis (New) | |
| content = analyze_content(video_info=video_info) | |
| # Scoring | |
| signals = { | |
| "visual": visual, | |
| "audio": audio, | |
| "metadata": meta, | |
| "heuristics": heuristics, | |
| "content": content | |
| } | |
| score, confidence, rec = calculate_risk(signals) | |
| # Build explanation | |
| if thumbnail_only: | |
| explanation = f"⚠️ Thumbnail-only analysis. Risk score: {score}/100 ({rec}). {confidence} confidence." | |
| else: | |
| explanation = f"Extensive analysis of {len(frame_paths)} frames and audio signals. Risk score: {score}/100 ({rec}). {confidence} confidence." | |
| result = { | |
| "score": score, | |
| "confidence": confidence, | |
| "recommendation": rec, | |
| "signals": signals, | |
| "thumbnail_only": thumbnail_only, | |
| "video_info": { | |
| "title": video_info.get("title", "Unknown"), | |
| "duration": video_info.get("duration"), | |
| "resolution": f"{video_info.get('width', '?')}x{video_info.get('height', '?')}" | |
| }, | |
| "explanation": explanation, | |
| "disclaimer": "AI detection is probabilistic." | |
| } | |
| if url: | |
| save_to_cache(url, result) | |
| clean_temp(job_id) | |
| result['id'] = job_id | |
| jobs_db[job_id] = {"status": "completed", "result": result} | |
| except Exception as e: | |
| print(f"Pipeline failure: {e}") | |
| jobs_db[job_id] = {"status": "failed", "error": str(e)} | |
| clean_temp(job_id) | |