import time import logging import os import json from sqlalchemy.orm import Session from app.models.file_model import File from app.ai.video.frame_extractor import extract_frames from app.ai.video.frame_detector import analyze_frame from app.ai.video.motion_detector import compute_motion_anomaly from app.ai.video.noise_entropy_detector import compute_noise_entropy_anomaly from app.ai.video.metadata_analyzer import analyze_metadata from app.ai.video.aggregator import aggregate_scores from app.ai.video.diffusion_spectrum_analyzer import compute_diffusion_spectrum_anomaly from app.ai.video.keyframe_heatmap import generate_video_anomaly_keyframe from app.ai.video.anomaly_gif_maker import generate_anomaly_gif from app.core.storage import active_storage import tempfile logger = logging.getLogger(__name__) def process_video_pipeline(file_id: int, file_path: str, db: Session): start_time = time.time() logger.info(f"Starting Video Pipeline for file_id: {file_id}") try: db_file = db.query(File).filter(File.id == file_id).first() if not db_file: logger.error(f"File ID {file_id} not found in DB.") return local_path = active_storage.download_to_temp(file_path) md_score, md_dict = analyze_metadata(local_path) frame_scores = [] motion_scores = [] noise_scores = [] diffusion_scores = [] previous_frame = None highest_ai_score = 0 most_suspect_frame_idx = 0 all_extracted_frames = [] timeline_data = [] for frame, timestamp_sec in extract_frames(local_path, sample_rate=15, max_frames=50): all_extracted_frames.append(frame.copy()) # Module A: ViT Image Forensic Analysis f_score = analyze_frame(frame) frame_scores.append(f_score) # Add this specific second to the UI Scrubber Data timeline_data.append({"time": round(timestamp_sec, 2), "ai_score": round(f_score, 3)}) if f_score > highest_ai_score: highest_ai_score = f_score most_suspect_frame_idx = len(all_extracted_frames) - 1 # Module B: Microscopic Silicon Noise Analysis n_score = compute_noise_entropy_anomaly(frame) if n_score is not None: noise_scores.append(n_score) # Module C: Diffusion Spectrum Analysis (FFT) d_score = compute_diffusion_spectrum_anomaly(frame) diffusion_scores.append(d_score) # Module D: Farneback Optical Flow if previous_frame is not None: m_score = compute_motion_anomaly(previous_frame, frame) if m_score is not None: motion_scores.append(m_score) previous_frame = frame if all_extracted_frames: # 1. Generate the 2-second GIF Clip gif_path = generate_anomaly_gif(all_extracted_frames, most_suspect_frame_idx) # Upload GIF to R2 class MockFile: def __init__(self, f): self.file = f self.content_type = "image/gif" with open(gif_path, "rb") as hf: mock_hf = MockFile(hf) import uuid safe_gif_name = f"{uuid.uuid4().hex}.gif" r2_gif_key = active_storage.save(mock_hf, f"heatmaps/{safe_gif_name}") db_file.heatmap_path = r2_gif_key # Overwriting the static image column with the GIF file! # clean up local gif if os.path.exists(gif_path): os.remove(gif_path) # 2. Save the JSON string for the Line Chart Scrubber db_file.timeline_data = json.dumps(timeline_data) avg_motion = sum(motion_scores)/len(motion_scores) if motion_scores else None avg_noise = sum(noise_scores)/len(noise_scores) if noise_scores else None avg_diffusion = sum(diffusion_scores)/len(diffusion_scores) if diffusion_scores else 0.0 final_verdict = aggregate_scores( frame_scores=frame_scores, motion_score=avg_motion, noise_score=avg_noise, diffusion_score=avg_diffusion, metadata_score=md_score ) db_file.status = "Completed" db_file.result = f"{final_verdict['label']} ({final_verdict['probability']*100:.1f}%)" db_file.confidence = final_verdict['probability'] db_file.ai_explanation = final_verdict['explanation'] db.commit() elapsed = time.time() - start_time logger.info(f"Successfully processed video {file_id} in {elapsed:.2f} seconds.") logger.info(f"Verdict: {db_file.result}") except Exception as e: logger.error(f"FATAL Pipeline Crash for file_id {file_id}: {str(e)}") try: db_file = db.query(File).filter(File.id == file_id).first() if db_file: db_file.status = "Failed" db_file.result = str(e) db.commit() except: pass finally: if 'local_path' in locals() and os.path.exists(local_path) and getattr(active_storage, '__class__').__name__ == "R2StorageProvider": os.remove(local_path)