Spotix-API / backend /app /services /video_processor.py
Anish-530
[Deployment Phase 1 & 2] - Neon DB and R2 with working turnstile in login page implemented
0edd56d
import time
import logging
import os
import json
from sqlalchemy.orm import Session
from app.models.file_model import File
from app.ai.video.frame_extractor import extract_frames
from app.ai.video.frame_detector import analyze_frame
from app.ai.video.motion_detector import compute_motion_anomaly
from app.ai.video.noise_entropy_detector import compute_noise_entropy_anomaly
from app.ai.video.metadata_analyzer import analyze_metadata
from app.ai.video.aggregator import aggregate_scores
from app.ai.video.diffusion_spectrum_analyzer import compute_diffusion_spectrum_anomaly
from app.ai.video.keyframe_heatmap import generate_video_anomaly_keyframe
from app.ai.video.anomaly_gif_maker import generate_anomaly_gif
from app.core.storage import active_storage
import tempfile
logger = logging.getLogger(__name__)
def process_video_pipeline(file_id: int, file_path: str, db: Session):
start_time = time.time()
logger.info(f"Starting Video Pipeline for file_id: {file_id}")
try:
db_file = db.query(File).filter(File.id == file_id).first()
if not db_file:
logger.error(f"File ID {file_id} not found in DB.")
return
local_path = active_storage.download_to_temp(file_path)
md_score, md_dict = analyze_metadata(local_path)
frame_scores = []
motion_scores = []
noise_scores = []
diffusion_scores = []
previous_frame = None
highest_ai_score = 0
most_suspect_frame_idx = 0
all_extracted_frames = []
timeline_data = []
for frame, timestamp_sec in extract_frames(local_path, sample_rate=15, max_frames=50):
all_extracted_frames.append(frame.copy())
# Module A: ViT Image Forensic Analysis
f_score = analyze_frame(frame)
frame_scores.append(f_score)
# Add this specific second to the UI Scrubber Data
timeline_data.append({"time": round(timestamp_sec, 2), "ai_score": round(f_score, 3)})
if f_score > highest_ai_score:
highest_ai_score = f_score
most_suspect_frame_idx = len(all_extracted_frames) - 1
# Module B: Microscopic Silicon Noise Analysis
n_score = compute_noise_entropy_anomaly(frame)
if n_score is not None:
noise_scores.append(n_score)
# Module C: Diffusion Spectrum Analysis (FFT)
d_score = compute_diffusion_spectrum_anomaly(frame)
diffusion_scores.append(d_score)
# Module D: Farneback Optical Flow
if previous_frame is not None:
m_score = compute_motion_anomaly(previous_frame, frame)
if m_score is not None:
motion_scores.append(m_score)
previous_frame = frame
if all_extracted_frames:
# 1. Generate the 2-second GIF Clip
gif_path = generate_anomaly_gif(all_extracted_frames, most_suspect_frame_idx)
# Upload GIF to R2
class MockFile:
def __init__(self, f):
self.file = f
self.content_type = "image/gif"
with open(gif_path, "rb") as hf:
mock_hf = MockFile(hf)
import uuid
safe_gif_name = f"{uuid.uuid4().hex}.gif"
r2_gif_key = active_storage.save(mock_hf, f"heatmaps/{safe_gif_name}")
db_file.heatmap_path = r2_gif_key # Overwriting the static image column with the GIF file!
# clean up local gif
if os.path.exists(gif_path):
os.remove(gif_path)
# 2. Save the JSON string for the Line Chart Scrubber
db_file.timeline_data = json.dumps(timeline_data)
avg_motion = sum(motion_scores)/len(motion_scores) if motion_scores else None
avg_noise = sum(noise_scores)/len(noise_scores) if noise_scores else None
avg_diffusion = sum(diffusion_scores)/len(diffusion_scores) if diffusion_scores else 0.0
final_verdict = aggregate_scores(
frame_scores=frame_scores,
motion_score=avg_motion,
noise_score=avg_noise,
diffusion_score=avg_diffusion,
metadata_score=md_score
)
db_file.status = "Completed"
db_file.result = f"{final_verdict['label']} ({final_verdict['probability']*100:.1f}%)"
db_file.confidence = final_verdict['probability']
db_file.ai_explanation = final_verdict['explanation']
db.commit()
elapsed = time.time() - start_time
logger.info(f"Successfully processed video {file_id} in {elapsed:.2f} seconds.")
logger.info(f"Verdict: {db_file.result}")
except Exception as e:
logger.error(f"FATAL Pipeline Crash for file_id {file_id}: {str(e)}")
try:
db_file = db.query(File).filter(File.id == file_id).first()
if db_file:
db_file.status = "Failed"
db_file.result = str(e)
db.commit()
except:
pass
finally:
if 'local_path' in locals() and os.path.exists(local_path) and getattr(active_storage, '__class__').__name__ == "R2StorageProvider":
os.remove(local_path)