Spaces:
Running
Running
File size: 5,624 Bytes
0edd56d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | import time
import logging
import os
import json
from sqlalchemy.orm import Session
from app.models.file_model import File
from app.ai.video.frame_extractor import extract_frames
from app.ai.video.frame_detector import analyze_frame
from app.ai.video.motion_detector import compute_motion_anomaly
from app.ai.video.noise_entropy_detector import compute_noise_entropy_anomaly
from app.ai.video.metadata_analyzer import analyze_metadata
from app.ai.video.aggregator import aggregate_scores
from app.ai.video.diffusion_spectrum_analyzer import compute_diffusion_spectrum_anomaly
from app.ai.video.keyframe_heatmap import generate_video_anomaly_keyframe
from app.ai.video.anomaly_gif_maker import generate_anomaly_gif
from app.core.storage import active_storage
import tempfile
logger = logging.getLogger(__name__)
def process_video_pipeline(file_id: int, file_path: str, db: Session):
start_time = time.time()
logger.info(f"Starting Video Pipeline for file_id: {file_id}")
try:
db_file = db.query(File).filter(File.id == file_id).first()
if not db_file:
logger.error(f"File ID {file_id} not found in DB.")
return
local_path = active_storage.download_to_temp(file_path)
md_score, md_dict = analyze_metadata(local_path)
frame_scores = []
motion_scores = []
noise_scores = []
diffusion_scores = []
previous_frame = None
highest_ai_score = 0
most_suspect_frame_idx = 0
all_extracted_frames = []
timeline_data = []
for frame, timestamp_sec in extract_frames(local_path, sample_rate=15, max_frames=50):
all_extracted_frames.append(frame.copy())
# Module A: ViT Image Forensic Analysis
f_score = analyze_frame(frame)
frame_scores.append(f_score)
# Add this specific second to the UI Scrubber Data
timeline_data.append({"time": round(timestamp_sec, 2), "ai_score": round(f_score, 3)})
if f_score > highest_ai_score:
highest_ai_score = f_score
most_suspect_frame_idx = len(all_extracted_frames) - 1
# Module B: Microscopic Silicon Noise Analysis
n_score = compute_noise_entropy_anomaly(frame)
if n_score is not None:
noise_scores.append(n_score)
# Module C: Diffusion Spectrum Analysis (FFT)
d_score = compute_diffusion_spectrum_anomaly(frame)
diffusion_scores.append(d_score)
# Module D: Farneback Optical Flow
if previous_frame is not None:
m_score = compute_motion_anomaly(previous_frame, frame)
if m_score is not None:
motion_scores.append(m_score)
previous_frame = frame
if all_extracted_frames:
# 1. Generate the 2-second GIF Clip
gif_path = generate_anomaly_gif(all_extracted_frames, most_suspect_frame_idx)
# Upload GIF to R2
class MockFile:
def __init__(self, f):
self.file = f
self.content_type = "image/gif"
with open(gif_path, "rb") as hf:
mock_hf = MockFile(hf)
import uuid
safe_gif_name = f"{uuid.uuid4().hex}.gif"
r2_gif_key = active_storage.save(mock_hf, f"heatmaps/{safe_gif_name}")
db_file.heatmap_path = r2_gif_key # Overwriting the static image column with the GIF file!
# clean up local gif
if os.path.exists(gif_path):
os.remove(gif_path)
# 2. Save the JSON string for the Line Chart Scrubber
db_file.timeline_data = json.dumps(timeline_data)
avg_motion = sum(motion_scores)/len(motion_scores) if motion_scores else None
avg_noise = sum(noise_scores)/len(noise_scores) if noise_scores else None
avg_diffusion = sum(diffusion_scores)/len(diffusion_scores) if diffusion_scores else 0.0
final_verdict = aggregate_scores(
frame_scores=frame_scores,
motion_score=avg_motion,
noise_score=avg_noise,
diffusion_score=avg_diffusion,
metadata_score=md_score
)
db_file.status = "Completed"
db_file.result = f"{final_verdict['label']} ({final_verdict['probability']*100:.1f}%)"
db_file.confidence = final_verdict['probability']
db_file.ai_explanation = final_verdict['explanation']
db.commit()
elapsed = time.time() - start_time
logger.info(f"Successfully processed video {file_id} in {elapsed:.2f} seconds.")
logger.info(f"Verdict: {db_file.result}")
except Exception as e:
logger.error(f"FATAL Pipeline Crash for file_id {file_id}: {str(e)}")
try:
db_file = db.query(File).filter(File.id == file_id).first()
if db_file:
db_file.status = "Failed"
db_file.result = str(e)
db.commit()
except:
pass
finally:
if 'local_path' in locals() and os.path.exists(local_path) and getattr(active_storage, '__class__').__name__ == "R2StorageProvider":
os.remove(local_path)
|